mirror of
https://github.com/XRPLF/clio.git
synced 2025-12-06 17:27:58 +00:00
@@ -94,6 +94,15 @@
|
||||
"log_level": "trace"
|
||||
}
|
||||
],
|
||||
"cache": {
|
||||
// Configure this to use either "num_diffs", "num_cursors_from_diff", or "num_cursors_from_account". By default, Clio uses "num_diffs".
|
||||
"num_diffs": 32, // Generate the cursors from the latest ledger diff, then use the cursors to partition the ledger to load concurrently. The cursors number is affected by the busyness of the network.
|
||||
// "num_cursors_from_diff": 3200, // Read the cursors from the diff table until we have enough cursors to partition the ledger to load concurrently.
|
||||
// "num_cursors_from_account": 3200, // Read the cursors from the account table until we have enough cursors to partition the ledger to load concurrently.
|
||||
"num_markers": 48, // The number of markers is the number of coroutines to load the cache concurrently.
|
||||
"page_fetch_size": 512, // The number of rows to load for each page.
|
||||
"load": "async" // "sync" to load cache synchronously or "async" to load cache asynchronously or "none"/"no" to turn off the cache.
|
||||
},
|
||||
"prometheus": {
|
||||
"enabled": true,
|
||||
"compress_reply": true
|
||||
|
||||
@@ -199,6 +199,19 @@ public:
|
||||
std::optional<LedgerRange>
|
||||
fetchLedgerRange() const;
|
||||
|
||||
/**
|
||||
* @brief Fetch the specified number of account root object indexes by page, the accounts need to exist for seq.
|
||||
*
|
||||
* @param number The number of accounts to fetch
|
||||
* @param pageSize The maximum number of accounts per page
|
||||
* @param seq The accounts need to exist for this sequence
|
||||
* @param yield The coroutine context
|
||||
* @return A vector of ripple::uint256 representing the account roots
|
||||
*/
|
||||
virtual std::vector<ripple::uint256>
|
||||
fetchAccountRoots(std::uint32_t number, std::uint32_t pageSize, std::uint32_t seq, boost::asio::yield_context yield)
|
||||
const = 0;
|
||||
|
||||
/**
|
||||
* @brief Updates the range of sequences that are stored in the DB.
|
||||
*
|
||||
|
||||
@@ -40,6 +40,7 @@
|
||||
#include <ripple/basics/base_uint.h>
|
||||
#include <ripple/basics/strHex.h>
|
||||
#include <ripple/protocol/AccountID.h>
|
||||
#include <ripple/protocol/Indexes.h>
|
||||
#include <ripple/protocol/LedgerHeader.h>
|
||||
#include <ripple/protocol/nft.h>
|
||||
|
||||
@@ -693,6 +694,50 @@ public:
|
||||
return results;
|
||||
}
|
||||
|
||||
std::vector<ripple::uint256>
|
||||
fetchAccountRoots(std::uint32_t number, std::uint32_t pageSize, std::uint32_t seq, boost::asio::yield_context yield)
|
||||
const override
|
||||
{
|
||||
std::vector<ripple::uint256> liveAccounts;
|
||||
std::optional<ripple::AccountID> lastItem;
|
||||
|
||||
while (liveAccounts.size() < number) {
|
||||
Statement statement = lastItem ? schema_->selectAccountFromToken.bind(*lastItem, Limit{pageSize})
|
||||
: schema_->selectAccountFromBegining.bind(Limit{pageSize});
|
||||
|
||||
auto const res = executor_.read(yield, statement);
|
||||
if (res) {
|
||||
auto const& results = res.value();
|
||||
if (not results.hasRows()) {
|
||||
LOG(log_.debug()) << "No rows returned";
|
||||
break;
|
||||
}
|
||||
// The results should not contain duplicates, we just filter out deleted accounts
|
||||
std::vector<ripple::uint256> fullAccounts;
|
||||
for (auto [account] : extract<ripple::AccountID>(results)) {
|
||||
fullAccounts.push_back(ripple::keylet::account(account).key);
|
||||
lastItem = account;
|
||||
}
|
||||
auto const objs = doFetchLedgerObjects(fullAccounts, seq, yield);
|
||||
|
||||
for (auto i = 0u; i < fullAccounts.size(); i++) {
|
||||
if (not objs[i].empty()) {
|
||||
if (liveAccounts.size() < number) {
|
||||
liveAccounts.push_back(fullAccounts[i]);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LOG(log_.error()) << "Could not fetch account from account_tx: " << res.error();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return liveAccounts;
|
||||
}
|
||||
|
||||
std::vector<LedgerObject>
|
||||
fetchLedgerDiff(std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
|
||||
{
|
||||
|
||||
@@ -586,6 +586,32 @@ public:
|
||||
));
|
||||
}();
|
||||
|
||||
PreparedStatement selectAccountFromBegining = [this]() {
|
||||
return handle_.get().prepare(fmt::format(
|
||||
R"(
|
||||
SELECT account
|
||||
FROM {}
|
||||
WHERE token(account) > 0
|
||||
PER PARTITION LIMIT 1
|
||||
LIMIT ?
|
||||
)",
|
||||
qualifiedTableName(settingsProvider_.get(), "account_tx")
|
||||
));
|
||||
}();
|
||||
|
||||
PreparedStatement selectAccountFromToken = [this]() {
|
||||
return handle_.get().prepare(fmt::format(
|
||||
R"(
|
||||
SELECT account
|
||||
FROM {}
|
||||
WHERE token(account) > token(?)
|
||||
PER PARTITION LIMIT 1
|
||||
LIMIT ?
|
||||
)",
|
||||
qualifiedTableName(settingsProvider_.get(), "account_tx")
|
||||
));
|
||||
}();
|
||||
|
||||
PreparedStatement selectAccountTxForward = [this]() {
|
||||
return handle_.get().prepare(fmt::format(
|
||||
R"(
|
||||
|
||||
@@ -22,6 +22,9 @@
|
||||
#include "data/BackendInterface.hpp"
|
||||
#include "etl/CacheLoaderSettings.hpp"
|
||||
#include "etl/impl/CacheLoader.hpp"
|
||||
#include "etl/impl/CursorFromAccountProvider.hpp"
|
||||
#include "etl/impl/CursorFromDiffProvider.hpp"
|
||||
#include "etl/impl/CursorFromFixDiffNumProvider.hpp"
|
||||
#include "util/Assert.hpp"
|
||||
#include "util/async/context/BasicExecutionContext.hpp"
|
||||
#include "util/log/Logger.hpp"
|
||||
@@ -41,10 +44,7 @@ namespace etl {
|
||||
* @tparam CursorProviderType The type of the cursor provider to use
|
||||
* @tparam ExecutionContextType The type of the execution context to use
|
||||
*/
|
||||
template <
|
||||
typename CacheType,
|
||||
typename CursorProviderType = impl::CursorProvider,
|
||||
typename ExecutionContextType = util::async::CoroExecutionContext>
|
||||
template <typename CacheType, typename ExecutionContextType = util::async::CoroExecutionContext>
|
||||
class CacheLoader {
|
||||
using CacheLoaderType = impl::CacheLoaderImpl<CacheType>;
|
||||
|
||||
@@ -88,7 +88,22 @@ public:
|
||||
return;
|
||||
}
|
||||
|
||||
auto const provider = CursorProviderType{backend_, settings_.numCacheDiffs};
|
||||
std::shared_ptr<impl::BaseCursorProvider> provider;
|
||||
if (settings_.numCacheCursorsFromDiff != 0) {
|
||||
LOG(log_.info()) << "Loading cache with cursor from num_cursors_from_diff="
|
||||
<< settings_.numCacheCursorsFromDiff;
|
||||
provider = std::make_shared<impl::CursorFromDiffProvider>(backend_, settings_.numCacheCursorsFromDiff);
|
||||
} else if (settings_.numCacheCursorsFromAccount != 0) {
|
||||
LOG(log_.info()) << "Loading cache with cursor from num_cursors_from_account="
|
||||
<< settings_.numCacheCursorsFromAccount;
|
||||
provider = std::make_shared<impl::CursorFromAccountProvider>(
|
||||
backend_, settings_.numCacheCursorsFromAccount, settings_.cachePageFetchSize
|
||||
);
|
||||
} else {
|
||||
LOG(log_.info()) << "Loading cache with cursor from num_diffs=" << settings_.numCacheDiffs;
|
||||
provider = std::make_shared<impl::CursorFromFixDiffNumProvider>(backend_, settings_.numCacheDiffs);
|
||||
}
|
||||
|
||||
loader_ = std::make_unique<CacheLoaderType>(
|
||||
ctx_,
|
||||
backend_,
|
||||
@@ -96,7 +111,7 @@ public:
|
||||
seq,
|
||||
settings_.numCacheMarkers,
|
||||
settings_.cachePageFetchSize,
|
||||
provider.getCursors(seq)
|
||||
provider->getCursors(seq)
|
||||
);
|
||||
|
||||
if (settings_.isSync()) {
|
||||
|
||||
@@ -53,7 +53,13 @@ make_CacheLoaderSettings(util::Config const& config)
|
||||
settings.numThreads = config.valueOr("io_threads", settings.numThreads);
|
||||
if (config.contains("cache")) {
|
||||
auto const cache = config.section("cache");
|
||||
// Given diff number to generate cursors
|
||||
settings.numCacheDiffs = cache.valueOr<size_t>("num_diffs", settings.numCacheDiffs);
|
||||
// Given cursors number fetching from diff
|
||||
settings.numCacheCursorsFromDiff = cache.valueOr<size_t>("num_cursors_from_diff", 0);
|
||||
// Given cursors number fetching from account
|
||||
settings.numCacheCursorsFromAccount = cache.valueOr<size_t>("num_cursors_from_account", 0);
|
||||
|
||||
settings.numCacheMarkers = cache.valueOr<size_t>("num_markers", settings.numCacheMarkers);
|
||||
settings.cachePageFetchSize = cache.valueOr<size_t>("page_fetch_size", settings.cachePageFetchSize);
|
||||
|
||||
|
||||
@@ -32,10 +32,12 @@ struct CacheLoaderSettings {
|
||||
/** @brief Ways to load the cache */
|
||||
enum class LoadStyle { ASYNC, SYNC, NONE };
|
||||
|
||||
size_t numCacheDiffs = 32; /**< number of diffs to use to generate cursors */
|
||||
size_t numCacheMarkers = 48; /**< number of markers to use at one time to traverse the ledger */
|
||||
size_t cachePageFetchSize = 512; /**< number of ledger objects to fetch concurrently per marker */
|
||||
size_t numThreads = 2; /**< number of threads to use for loading cache */
|
||||
size_t numCacheDiffs = 32; /**< number of diffs to use to generate cursors */
|
||||
size_t numCacheMarkers = 48; /**< number of markers to use at one time to traverse the ledger */
|
||||
size_t cachePageFetchSize = 512; /**< number of ledger objects to fetch concurrently per marker */
|
||||
size_t numThreads = 2; /**< number of threads to use for loading cache */
|
||||
size_t numCacheCursorsFromDiff = 0; /**< number of cursors to fetch from diff */
|
||||
size_t numCacheCursorsFromAccount = 0; /**< number of cursors to fetch from account_tx */
|
||||
|
||||
LoadStyle loadStyle = LoadStyle::ASYNC; /**< how to load the cache */
|
||||
|
||||
|
||||
@@ -29,3 +29,44 @@ trying to write a record with a key that already exists. ETL uses this error to
|
||||
determine that another process is writing to the database, and subsequently
|
||||
falls back to a soft read-only mode. clio can also operate in strict
|
||||
read-only mode, in which case they will never write to the database.
|
||||
|
||||
## Ledger cache
|
||||
To efficiently reduce database load and improve RPC performance, we maintain a ledger cache in memory. The cache stores all entities of the latest ledger as a map of index to object, and is updated whenever a new ledger is validated.
|
||||
|
||||
The successor table stores each ledger's object indexes as a linked list
|
||||
,
|
||||
which is used by cache loader to load the all ledger objects belonging to the latest ledger to memory concurrently.
|
||||
The head of the linked list is data::firstKey(**0x0000000000000000000000000000000000000000000000000000000000000000**), and the tail is data::lastKey(**0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF**). The linked list is partitioned into
|
||||
multiple segments by cursors and each segment will be picked by a coroutine to load. There are total `cache.num_markers` coroutines to load the ledger objects concurrently. A coroutine will pick a segment from a queue and load it with the step of `cache.page_fetch_size` until the queue is empty.
|
||||
|
||||
|
||||
For example, if segment
|
||||
(**0x08581464C55B0B2C8C4FA27FA8DE0ED695D3BE019E7BE0969C925F868FE27A51-0x08A67682E62229DA4D597D308C8F028ECF47962D5068A78802E22B258DC25D22**)
|
||||
is assigned to a coroutine, the coroutine will load the ledger objects from index
|
||||
**0x08581464C55B0B2C8C4FA27FA8DE0ED695D3BE019E7BE0969C925F868FE27A51** to
|
||||
**0x08A67682E62229DA4D597D308C8F028ECF47962D5068A78802E22B258DC25D22**. The coroutine will request `cache.page_fetch_size`
|
||||
objects from database each time until reach the end of the segment. After the coroutine finishes loading this
|
||||
segment, it will fetch the next segment in queue and repeat.
|
||||
|
||||
Because of the nature of the linked list, the cursors are crucial to balance the workload of each
|
||||
coroutine. We have 3 types of cursor generation can be used:
|
||||
|
||||
- **cache.num_diffs**
|
||||
Cursors will be generated by the changed objects in the latest `cache.num_diffs` number of ledgers. The default value is 32. In mainnet, this type works well because the network is stablily busy and the number of changed objects in each ledger is relatively stable. We are able to get enough cursors after removing the deleted objects on mainnet. For other networks, like the devnet, the number of changed objects in each ledger is not stable. When the network is slient, one cotoutine may load a large number of objects while the other coroutines are idel.
|
||||
Here is the comparison of the number of cursors and loading time on devnet:
|
||||
|
||||
| Cursors | Loading time /seconds |
|
||||
| --- | --- |
|
||||
| 11 | 2072 |
|
||||
| 33 | 983 |
|
||||
| 120 | 953 |
|
||||
| 200 | 843 |
|
||||
| 250 | 816 |
|
||||
| 500 | 792 |
|
||||
|
||||
- **cache.num_cursors_from_diff**
|
||||
Cursors will be generated by the changed objects in the recent ledgers. The generator will keep reading the previous ledger until we have `cache.num_cursors_from_diff` cursors. The type is the evolved version of the first type. It removes the network busyness factor and only considers the number of cursors. The cache loading can be well tuned by this configuration.
|
||||
|
||||
-- **cache.num_cursors_from_account**
|
||||
If the server does not have enough historical ledgers, another option is to generate the cursors by the account. The generator will keep reading accounts from account_tx table until we have `cache.num_cursors_from_account` cursors.
|
||||
|
||||
|
||||
38
src/etl/impl/BaseCursorProvider.hpp
Normal file
38
src/etl/impl/BaseCursorProvider.hpp
Normal file
@@ -0,0 +1,38 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of clio: https://github.com/XRPLF/clio
|
||||
Copyright (c) 2024, the clio developers.
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <ripple/basics/base_uint.h>
|
||||
|
||||
#include <cstdint>
|
||||
#include <vector>
|
||||
namespace etl::impl {
|
||||
|
||||
struct CursorPair {
|
||||
ripple::uint256 start;
|
||||
ripple::uint256 end;
|
||||
};
|
||||
|
||||
struct BaseCursorProvider {
|
||||
[[nodiscard]] std::vector<CursorPair> virtual getCursors(uint32_t seq) const = 0;
|
||||
virtual ~BaseCursorProvider() = default;
|
||||
};
|
||||
|
||||
} // namespace etl::impl
|
||||
@@ -21,7 +21,7 @@
|
||||
|
||||
#include "data/BackendInterface.hpp"
|
||||
#include "etl/ETLHelpers.hpp"
|
||||
#include "etl/impl/CursorProvider.hpp"
|
||||
#include "etl/impl/BaseCursorProvider.hpp"
|
||||
#include "util/async/AnyExecutionContext.hpp"
|
||||
#include "util/async/AnyOperation.hpp"
|
||||
#include "util/log/Logger.hpp"
|
||||
@@ -38,7 +38,6 @@
|
||||
#include <cstdint>
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
#include <ranges>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
84
src/etl/impl/CursorFromAccountProvider.hpp
Normal file
84
src/etl/impl/CursorFromAccountProvider.hpp
Normal file
@@ -0,0 +1,84 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of clio: https://github.com/XRPLF/clio
|
||||
Copyright (c) 2024, the clio developers.
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "data/BackendInterface.hpp"
|
||||
#include "data/Types.hpp"
|
||||
#include "etl/impl/BaseCursorProvider.hpp"
|
||||
|
||||
#include <ripple/basics/base_uint.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <iterator>
|
||||
#include <memory>
|
||||
#include <set>
|
||||
#include <vector>
|
||||
|
||||
namespace etl::impl {
|
||||
|
||||
class CursorFromAccountProvider : public BaseCursorProvider {
|
||||
std::shared_ptr<BackendInterface> backend_;
|
||||
size_t numCursors_;
|
||||
size_t pageSize_;
|
||||
|
||||
public:
|
||||
CursorFromAccountProvider(std::shared_ptr<BackendInterface> const& backend, size_t numCursors, size_t pageSize)
|
||||
: backend_{backend}, numCursors_{numCursors}, pageSize_{pageSize}
|
||||
{
|
||||
}
|
||||
|
||||
[[nodiscard]] std::vector<CursorPair>
|
||||
getCursors(uint32_t const seq) const override
|
||||
{
|
||||
namespace rg = std::ranges;
|
||||
|
||||
auto accountRoots = [this, seq]() {
|
||||
return data::synchronousAndRetryOnTimeout([this, seq](auto yield) {
|
||||
return backend_->fetchAccountRoots(numCursors_, pageSize_, seq, yield);
|
||||
});
|
||||
}();
|
||||
|
||||
rg::sort(accountRoots);
|
||||
std::vector<ripple::uint256> cursors{data::firstKey};
|
||||
rg::copy(accountRoots.begin(), accountRoots.end(), std::back_inserter(cursors));
|
||||
rg::sort(cursors);
|
||||
cursors.push_back(data::lastKey);
|
||||
|
||||
std::vector<CursorPair> pairs;
|
||||
pairs.reserve(cursors.size());
|
||||
|
||||
// FIXME: this should be `cursors | vs::pairwise` (C++23)
|
||||
std::transform(
|
||||
std::begin(cursors),
|
||||
std::prev(std::end(cursors)),
|
||||
std::next(std::begin(cursors)),
|
||||
std::back_inserter(pairs),
|
||||
[](auto&& a, auto&& b) -> CursorPair {
|
||||
return {a, b};
|
||||
}
|
||||
);
|
||||
|
||||
return pairs;
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace etl::impl
|
||||
111
src/etl/impl/CursorFromDiffProvider.hpp
Normal file
111
src/etl/impl/CursorFromDiffProvider.hpp
Normal file
@@ -0,0 +1,111 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of clio: https://github.com/XRPLF/clio
|
||||
Copyright (c) 2024, the clio developers.
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "data/BackendInterface.hpp"
|
||||
#include "data/Types.hpp"
|
||||
#include "etl/impl/BaseCursorProvider.hpp"
|
||||
#include "util/Assert.hpp"
|
||||
|
||||
#include <ripple/basics/base_uint.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <iterator>
|
||||
#include <memory>
|
||||
#include <ranges>
|
||||
#include <set>
|
||||
#include <vector>
|
||||
|
||||
namespace etl::impl {
|
||||
|
||||
class CursorFromDiffProvider : public BaseCursorProvider {
|
||||
std::shared_ptr<BackendInterface> backend_;
|
||||
size_t numCursors_;
|
||||
|
||||
public:
|
||||
CursorFromDiffProvider(std::shared_ptr<BackendInterface> const& backend, size_t numCursors)
|
||||
: backend_{backend}, numCursors_{numCursors}
|
||||
{
|
||||
}
|
||||
|
||||
[[nodiscard]] std::vector<CursorPair>
|
||||
getCursors(uint32_t const seq) const override
|
||||
{
|
||||
namespace rg = std::ranges;
|
||||
namespace vs = std::views;
|
||||
|
||||
auto const fetchDiff = [this, seq](uint32_t offset) {
|
||||
return data::synchronousAndRetryOnTimeout([this, seq, offset](auto yield) {
|
||||
return backend_->fetchLedgerDiff(seq - offset, yield);
|
||||
});
|
||||
};
|
||||
|
||||
auto const range = backend_->fetchLedgerRange();
|
||||
ASSERT(range.has_value(), "Ledger range is not available when cache is loading");
|
||||
|
||||
std::set<ripple::uint256> liveCursors;
|
||||
std::set<ripple::uint256> deletedCursors;
|
||||
auto i = 0;
|
||||
while (liveCursors.size() < numCursors_ and seq - i >= range->minSequence) {
|
||||
auto diffs = fetchDiff(i++);
|
||||
rg::copy(
|
||||
diffs //
|
||||
| vs::filter([&deletedCursors](auto const& obj) {
|
||||
return not obj.blob.empty() and !deletedCursors.contains(obj.key);
|
||||
}) //
|
||||
| vs::transform([](auto const& obj) { return obj.key; }),
|
||||
std::inserter(liveCursors, std::begin(liveCursors))
|
||||
);
|
||||
|
||||
// track the deleted objects
|
||||
rg::copy(
|
||||
diffs //
|
||||
| vs::filter([](auto const& obj) { return obj.blob.empty(); }) //
|
||||
| vs::transform([](auto const& obj) { return obj.key; }),
|
||||
std::inserter(deletedCursors, std::begin(deletedCursors))
|
||||
);
|
||||
}
|
||||
|
||||
std::vector<ripple::uint256> cursors{data::firstKey};
|
||||
rg::copy(liveCursors | vs::take(std::min(liveCursors.size(), numCursors_)), std::back_inserter(cursors));
|
||||
rg::sort(cursors);
|
||||
cursors.push_back(data::lastKey);
|
||||
|
||||
std::vector<CursorPair> pairs;
|
||||
pairs.reserve(cursors.size());
|
||||
|
||||
// FIXME: this should be `cursors | vs::pairwise` (C++23)
|
||||
std::transform(
|
||||
std::begin(cursors),
|
||||
std::prev(std::end(cursors)),
|
||||
std::next(std::begin(cursors)),
|
||||
std::back_inserter(pairs),
|
||||
[](auto&& a, auto&& b) -> CursorPair {
|
||||
return {a, b};
|
||||
}
|
||||
);
|
||||
|
||||
return pairs;
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace etl::impl
|
||||
@@ -21,7 +21,7 @@
|
||||
|
||||
#include "data/BackendInterface.hpp"
|
||||
#include "data/Types.hpp"
|
||||
#include "util/log/Logger.hpp"
|
||||
#include "etl/impl/BaseCursorProvider.hpp"
|
||||
|
||||
#include <boost/algorithm/string/predicate.hpp>
|
||||
#include <boost/asio/io_context.hpp>
|
||||
@@ -39,25 +39,19 @@
|
||||
|
||||
namespace etl::impl {
|
||||
|
||||
struct CursorPair {
|
||||
ripple::uint256 start;
|
||||
ripple::uint256 end;
|
||||
};
|
||||
|
||||
class CursorProvider {
|
||||
util::Logger log_{"ETL"};
|
||||
class CursorFromFixDiffNumProvider : public BaseCursorProvider {
|
||||
std::shared_ptr<BackendInterface> backend_;
|
||||
|
||||
size_t numDiffs_;
|
||||
|
||||
public:
|
||||
CursorProvider(std::shared_ptr<BackendInterface> const& backend, size_t numDiffs)
|
||||
CursorFromFixDiffNumProvider(std::shared_ptr<BackendInterface> const& backend, size_t numDiffs)
|
||||
: backend_{backend}, numDiffs_{numDiffs}
|
||||
{
|
||||
}
|
||||
|
||||
[[nodiscard]] std::vector<CursorPair>
|
||||
getCursors(uint32_t const seq) const
|
||||
getCursors(uint32_t const seq) const override
|
||||
{
|
||||
namespace rg = std::ranges;
|
||||
namespace vs = std::views;
|
||||
@@ -17,7 +17,9 @@ target_sources(
|
||||
etl/AmendmentBlockHandlerTests.cpp
|
||||
etl/CacheLoaderSettingsTests.cpp
|
||||
etl/CacheLoaderTests.cpp
|
||||
etl/CursorProviderTests.cpp
|
||||
etl/CursorFromAccountProviderTests.cpp
|
||||
etl/CursorFromDiffProviderTests.cpp
|
||||
etl/CursorFromFixDiffNumProviderTests.cpp
|
||||
etl/ETLStateTests.cpp
|
||||
etl/ExtractionDataPipeTests.cpp
|
||||
etl/ExtractorTests.cpp
|
||||
|
||||
@@ -118,7 +118,7 @@ TEST_P(ParametrizedCacheLoaderTest, LoadCacheWithDifferentSettings)
|
||||
EXPECT_CALL(cache, setFull).Times(1);
|
||||
|
||||
async::CoroExecutionContext ctx{settings.numThreads};
|
||||
etl::impl::CursorProvider const provider{backend, settings.numCacheDiffs};
|
||||
etl::impl::CursorFromFixDiffNumProvider const provider{backend, settings.numCacheDiffs};
|
||||
|
||||
etl::impl::CacheLoaderImpl<MockCache> loader{
|
||||
ctx, backend, cache, SEQ, settings.numCacheMarkers, settings.cachePageFetchSize, provider.getCursors(SEQ)
|
||||
@@ -146,7 +146,7 @@ TEST_P(ParametrizedCacheLoaderTest, AutomaticallyCancelledAndAwaitedInDestructor
|
||||
EXPECT_CALL(cache, setFull).Times(AtMost(1));
|
||||
|
||||
async::CoroExecutionContext ctx{settings.numThreads};
|
||||
etl::impl::CursorProvider const provider{backend, settings.numCacheDiffs};
|
||||
etl::impl::CursorFromFixDiffNumProvider const provider{backend, settings.numCacheDiffs};
|
||||
|
||||
etl::impl::CacheLoaderImpl<MockCache> const loader{
|
||||
ctx, backend, cache, SEQ, settings.numCacheMarkers, settings.cachePageFetchSize, provider.getCursors(SEQ)
|
||||
|
||||
68
unittests/etl/CursorFromAccountProviderTests.cpp
Normal file
68
unittests/etl/CursorFromAccountProviderTests.cpp
Normal file
@@ -0,0 +1,68 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of clio: https://github.com/XRPLF/clio
|
||||
Copyright (c) 2024, the clio developers.
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include "data/Types.hpp"
|
||||
#include "etl/impl/CursorFromAccountProvider.hpp"
|
||||
#include "util/Fixtures.hpp"
|
||||
|
||||
#include <gmock/gmock.h>
|
||||
#include <gtest/gtest.h>
|
||||
#include <ripple/basics/base_uint.h>
|
||||
|
||||
#include <vector>
|
||||
|
||||
using namespace etl;
|
||||
using namespace util;
|
||||
using namespace data;
|
||||
using namespace testing;
|
||||
|
||||
namespace {
|
||||
|
||||
constexpr auto SEQ = 30;
|
||||
|
||||
std::vector<ripple::uint256> const ACCOUNTROOTS = {
|
||||
ripple::uint256{"05E1EAC2574BE082B00B16F907CE32E6058DEB8F9E81CF34A00E80A5D71FA4FE"},
|
||||
ripple::uint256{"110872C7196EE6EF7032952F1852B11BB461A96FF2D7E06A8003B4BB30FD130B"},
|
||||
ripple::uint256{"3B3A84E850C724E914293271785A31D0BFC8B9DD1B6332E527B149AD72E80E18"},
|
||||
ripple::uint256{"4EC98C5C3F34C44409BC058998CBD64F6AED3FF6C0CAAEC15F7F42DF14EE9F04"},
|
||||
ripple::uint256{"58CEC9F17733EA7BA68C88E6179B8F207D001EE04D4E0366F958CC04FF6AB834"},
|
||||
ripple::uint256{"64FB1712146BA604C274CC335C5DE7ADFE52D1F8C3E904A9F9765FE8158A3E01"},
|
||||
ripple::uint256{"700BE23B1D9EE3E6BF52543D05843D5345B85D9EDB3D33BBD6B4C3A13C54B38E"},
|
||||
ripple::uint256{"82C297FCBCD634C4424F263D17480AA2F13975DF5846A5BB57246022CEEBE441"},
|
||||
ripple::uint256{"A2AA4C212DC2CA2C49BF58805F7C63363BC981018A01AC9609A7CBAB2A02CEDF"},
|
||||
};
|
||||
|
||||
struct CursorFromAccountProviderTests : MockBackendTestNaggy {};
|
||||
} // namespace
|
||||
|
||||
TEST_F(CursorFromAccountProviderTests, EnoughAccountRoots)
|
||||
{
|
||||
auto const numCursors = 9;
|
||||
auto const pageSize = 100;
|
||||
auto const provider = etl::impl::CursorFromAccountProvider{backend, numCursors, pageSize};
|
||||
|
||||
ON_CALL(*backend, fetchAccountRoots(numCursors, _, SEQ, _)).WillByDefault(Return(ACCOUNTROOTS));
|
||||
EXPECT_CALL(*backend, fetchAccountRoots(_, _, _, _)).Times(1);
|
||||
|
||||
auto const cursors = provider.getCursors(SEQ);
|
||||
ASSERT_EQ(cursors.size(), numCursors + 1);
|
||||
|
||||
EXPECT_EQ(cursors.front().start, firstKey);
|
||||
EXPECT_EQ(cursors.back().end, lastKey);
|
||||
}
|
||||
107
unittests/etl/CursorFromDiffProviderTests.cpp
Normal file
107
unittests/etl/CursorFromDiffProviderTests.cpp
Normal file
@@ -0,0 +1,107 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of clio: https://github.com/XRPLF/clio
|
||||
Copyright (c) 2024, the clio developers.
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include "data/Types.hpp"
|
||||
#include "etl/impl/CursorFromDiffProvider.hpp"
|
||||
#include "util/Fixtures.hpp"
|
||||
|
||||
#include <gmock/gmock.h>
|
||||
#include <gtest/gtest.h>
|
||||
#include <ripple/basics/base_uint.h>
|
||||
|
||||
#include <vector>
|
||||
|
||||
using namespace etl;
|
||||
using namespace util;
|
||||
using namespace data;
|
||||
using namespace testing;
|
||||
|
||||
namespace {
|
||||
|
||||
constexpr auto SEQ = 30;
|
||||
|
||||
std::vector<data::LedgerObject> const DIFFS_FOR_SEQ = {
|
||||
{.key = ripple::uint256{"05E1EAC2574BE082B00B16F907CE32E6058DEB8F9E81CF34A00E80A5D71FA4FE"}, .blob = Blob{}
|
||||
}, // This object is removed in SEQ while it exists in SEQ-1
|
||||
{.key = ripple::uint256{"110872C7196EE6EF7032952F1852B11BB461A96FF2D7E06A8003B4BB30FD130B"}, .blob = Blob{'s'}},
|
||||
{.key = ripple::uint256{"3B3A84E850C724E914293271785A31D0BFC8B9DD1B6332E527B149AD72E80E18"}, .blob = Blob{'s'}},
|
||||
{.key = ripple::uint256{"4EC98C5C3F34C44409BC058998CBD64F6AED3FF6C0CAAEC15F7F42DF14EE9F04"}, .blob = Blob{'s'}},
|
||||
{.key = ripple::uint256{"58CEC9F17733EA7BA68C88E6179B8F207D001EE04D4E0366F958CC04FF6AB834"}, .blob = Blob{'s'}},
|
||||
{.key = ripple::uint256{"64FB1712146BA604C274CC335C5DE7ADFE52D1F8C3E904A9F9765FE8158A3E01"}, .blob = Blob{'s'}},
|
||||
{.key = ripple::uint256{"700BE23B1D9EE3E6BF52543D05843D5345B85D9EDB3D33BBD6B4C3A13C54B38E"}, .blob = Blob{'s'}},
|
||||
{.key = ripple::uint256{"82C297FCBCD634C4424F263D17480AA2F13975DF5846A5BB57246022CEEBE441"}, .blob = Blob{'s'}},
|
||||
{.key = ripple::uint256{"A2AA4C212DC2CA2C49BF58805F7C63363BC981018A01AC9609A7CBAB2A02CEDF"}, .blob = Blob{'s'}},
|
||||
{.key = ripple::uint256{"BC0DAE09C0BFBC4A49AA94B849266588BFD6E1F554B184B5788AC55D6E07EB95"}, .blob = Blob{'s'}},
|
||||
{.key = ripple::uint256{"DCC8759A35CB946511763AA5553A82AA25F20B901C98C9BB74D423BCFAFF5F9D"}, .blob = Blob{'s'}},
|
||||
};
|
||||
|
||||
std::vector<data::LedgerObject> const DIFFS_FOR_SEQ_MINUS1 = {
|
||||
{.key = ripple::uint256{"05E1EAC2574BE082B00B16F907CE32E6058DEB8F9E81CF34A00E80A5D71FA4FE"}, .blob = Blob{'s'}},
|
||||
{.key = ripple::uint256{"110872C7196EE6EF7032952F1852B11BB461A96FF2D7E06A8003B4BB30FD1301"}, .blob = Blob{'s'}},
|
||||
{.key = ripple::uint256{"3B3A84E850C724E914293271785A31D0BFC8B9DD1B6332E527B149AD72E80E12"}, .blob = Blob{'s'}},
|
||||
{.key = ripple::uint256{"4EC98C5C3F34C44409BC058998CBD64F6AED3FF6C0CAAEC15F7F42DF14EE9F03"}, .blob = Blob{'s'}},
|
||||
{.key = ripple::uint256{"58CEC9F17733EA7BA68C88E6179B8F207D001EE04D4E0366F958CC04FF6AB834"}, .blob = Blob{'s'}
|
||||
}, // This object is changed in both SEQ and SEQ-1
|
||||
{.key = ripple::uint256{"64FB1712146BA604C274CC335C5DE7ADFE52D1F8C3E904A9F9765FE8158A3E05"}, .blob = Blob{'s'}},
|
||||
{.key = ripple::uint256{"700BE23B1D9EE3E6BF52543D05843D5345B85D9EDB3D33BBD6B4C3A13C54B386"}, .blob = Blob{'s'}},
|
||||
{.key = ripple::uint256{"82C297FCBCD634C4424F263D17480AA2F13975DF5846A5BB57246022CEEBE447"}, .blob = Blob{'s'}},
|
||||
{.key = ripple::uint256{"A2AA4C212DC2CA2C49BF58805F7C63363BC981018A01AC9609A7CBAB2A02CED8"}, .blob = Blob{'s'}},
|
||||
{.key = ripple::uint256{"BC0DAE09C0BFBC4A49AA94B849266588BFD6E1F554B184B5788AC55D6E07EB99"}, .blob = Blob{'s'}},
|
||||
{.key = ripple::uint256{"DCC8759A35CB946511763AA5553A82AA25F20B901C98C9BB74D423BCFAFF5F90"}, .blob = Blob{'s'}},
|
||||
};
|
||||
|
||||
struct CursorFromDiffProviderTests : MockBackendTestNaggy {};
|
||||
} // namespace
|
||||
|
||||
TEST_F(CursorFromDiffProviderTests, MultipleDiffs)
|
||||
{
|
||||
auto const numCursors = 15;
|
||||
auto const provider = etl::impl::CursorFromDiffProvider{backend, numCursors};
|
||||
|
||||
backend->setRange(SEQ - 10, SEQ);
|
||||
ON_CALL(*backend, fetchLedgerDiff(SEQ, _)).WillByDefault(Return(DIFFS_FOR_SEQ));
|
||||
ON_CALL(*backend, fetchLedgerDiff(SEQ - 1, _)).WillByDefault(Return(DIFFS_FOR_SEQ_MINUS1));
|
||||
EXPECT_CALL(*backend, fetchLedgerDiff(_, _)).Times(2);
|
||||
|
||||
auto const cursors = provider.getCursors(SEQ);
|
||||
ASSERT_EQ(cursors.size(), numCursors + 1);
|
||||
|
||||
EXPECT_EQ(cursors.front().start, firstKey);
|
||||
EXPECT_EQ(cursors.back().end, lastKey);
|
||||
}
|
||||
|
||||
TEST_F(CursorFromDiffProviderTests, NotEnoughDiffs)
|
||||
{
|
||||
auto const numCursors = 35;
|
||||
auto const provider = etl::impl::CursorFromDiffProvider{backend, numCursors};
|
||||
auto const AVAILABLE_DIFFS = 10;
|
||||
backend->setRange(SEQ - AVAILABLE_DIFFS + 1, SEQ);
|
||||
ON_CALL(*backend, fetchLedgerDiff(_, _)).WillByDefault(Return(std::vector<data::LedgerObject>{}));
|
||||
ON_CALL(*backend, fetchLedgerDiff(SEQ, _)).WillByDefault(Return(DIFFS_FOR_SEQ));
|
||||
ON_CALL(*backend, fetchLedgerDiff(SEQ - 1, _)).WillByDefault(Return(DIFFS_FOR_SEQ_MINUS1));
|
||||
EXPECT_CALL(*backend, fetchLedgerDiff(_, _)).Times(AVAILABLE_DIFFS);
|
||||
|
||||
auto const cursors = provider.getCursors(SEQ);
|
||||
auto const removed = 2; // lost 2 objects because it is removed.
|
||||
auto const repeated = 1; // repeated 1 object
|
||||
ASSERT_EQ(cursors.size(), DIFFS_FOR_SEQ.size() + DIFFS_FOR_SEQ_MINUS1.size() - removed - repeated + 1);
|
||||
|
||||
EXPECT_EQ(cursors.front().start, firstKey);
|
||||
EXPECT_EQ(cursors.back().end, lastKey);
|
||||
}
|
||||
@@ -18,7 +18,7 @@
|
||||
//==============================================================================
|
||||
|
||||
#include "data/Types.hpp"
|
||||
#include "etl/impl/CursorProvider.hpp"
|
||||
#include "etl/impl/CursorFromFixDiffNumProvider.hpp"
|
||||
#include "etl/impl/FakeDiffProvider.hpp"
|
||||
#include "util/Fixtures.hpp"
|
||||
#include "util/MockPrometheus.hpp"
|
||||
@@ -58,7 +58,7 @@ TEST_P(ParametrizedCursorProviderTest, GetCursorsWithDifferentProviderSettings)
|
||||
{
|
||||
auto const numDiffs = GetParam();
|
||||
auto const diffs = diffProvider.getLatestDiff();
|
||||
auto const provider = etl::impl::CursorProvider{backend, numDiffs};
|
||||
auto const provider = etl::impl::CursorFromFixDiffNumProvider{backend, numDiffs};
|
||||
|
||||
ON_CALL(*backend, fetchLedgerDiff(_, _)).WillByDefault(Return(diffs));
|
||||
EXPECT_CALL(*backend, fetchLedgerDiff(_, _)).Times(numDiffs);
|
||||
@@ -73,7 +73,7 @@ TEST_P(ParametrizedCursorProviderTest, GetCursorsWithDifferentProviderSettings)
|
||||
TEST_F(CursorProviderTest, EmptyCursorIsHandledCorrectly)
|
||||
{
|
||||
auto const diffs = diffProvider.getLatestDiff();
|
||||
auto const provider = etl::impl::CursorProvider{backend, 0};
|
||||
auto const provider = etl::impl::CursorFromFixDiffNumProvider{backend, 0};
|
||||
|
||||
ON_CALL(*backend, fetchLedgerDiff(_, _)).WillByDefault(Return(diffs));
|
||||
EXPECT_CALL(*backend, fetchLedgerDiff(_, _)).Times(0);
|
||||
@@ -140,6 +140,13 @@ struct MockBackend : public BackendInterface {
|
||||
(const, override)
|
||||
);
|
||||
|
||||
MOCK_METHOD(
|
||||
std::vector<ripple::uint256>,
|
||||
fetchAccountRoots,
|
||||
(std::uint32_t, std::uint32_t, std::uint32_t, boost::asio::yield_context),
|
||||
(const, override)
|
||||
);
|
||||
|
||||
MOCK_METHOD(
|
||||
std::optional<Blob>,
|
||||
doFetchLedgerObject,
|
||||
|
||||
Reference in New Issue
Block a user