Compare commits

..

24 Commits

Author SHA1 Message Date
Alex Kremer
7b043025e8 chore: Commits for 2.4.0-b3 (#1876) 2025-02-06 11:34:21 +00:00
cyan317
67c989081d fix clang-tidy issues (#1871) 2025-02-03 12:00:59 +00:00
github-actions[bot]
2fd16cd582 style: clang-tidy auto fixes (#1868)
Fixes #1867. Please review and commit clang-tidy fixes.

Co-authored-by: kuznetsss <15742918+kuznetsss@users.noreply.github.com>
2025-02-03 08:43:26 +00:00
Sergey Kuznetsov
89af8fe500 feat: Permissioned domains (#1841)
Fixes #1833.
2025-01-31 15:30:34 +00:00
cyan317
1753c95910 feat: Support Dynamic NFT (#1525)
Fix #1471
Clio's changes for supporting DNFT
https://github.com/XRPLF/rippled/pull/5048/files
2025-01-31 13:33:20 +00:00
Maria Shodunke
e7702e9c11 docs: Move metrics and static analysis docs (#1864)
Fixes #1219.
2025-01-31 11:37:27 +00:00
github-actions[bot]
e549657766 style: clang-tidy auto fixes (#1863)
Fixes #1862. Please review and commit clang-tidy fixes.

Co-authored-by: kuznetsss <15742918+kuznetsss@users.noreply.github.com>
2025-01-30 09:16:34 +00:00
Peter Chen
7c2742036b refactor: Remove boost filesystem (#1859) 2025-01-29 11:16:54 -05:00
Alex Kremer
73f375f20d feat: ETLng task manager (#1843) 2025-01-29 15:29:13 +00:00
Alex Kremer
3e200d8b9d feat: Add Conan profiles for common Sanitizers to docker ci image (#1856)
For #1049
2025-01-29 14:45:00 +00:00
Alex Kremer
81fe617816 fix: Re-add account_tx max limit (#1855) 2025-01-29 13:42:31 +00:00
Sergey Kuznetsov
75354fbecd fix: CacheLoader causes crash when no cache is used (#1853)
If cache is disabled or Clio starts with and empty DB, `loader_` inside
cache is not created. So calling `CacheLoader::stop()` or
`CacheLoader::wait()` was causing crash.
2025-01-28 18:10:19 +00:00
Sergey Kuznetsov
540e938223 refactor: Use mutex from utils (#1851)
Fixes #1359.
2025-01-27 15:28:00 +00:00
Sergey Kuznetsov
6ef6ca9e65 chore: Fix issue found by clang-tidy (#1849)
Fixes #1848
2025-01-23 12:29:43 +00:00
github-actions[bot]
35b9a066e3 style: clang-tidy auto fixes (#1847)
Fixes #1846. Please review and commit clang-tidy fixes.

Co-authored-by: kuznetsss <15742918+kuznetsss@users.noreply.github.com>
2025-01-23 09:34:00 +00:00
Sergey Kuznetsov
957028699b feat: Graceful shutdown (#1801)
Fixes #442.
2025-01-22 13:09:16 +00:00
cyan317
12e6fcc97e fix: gateway_balance discrepancy (#1839)
Fix https://github.com/XRPLF/clio/issues/1832

rippled code:

https://github.com/XRPLF/rippled/blob/develop/src/xrpld/rpc/handlers/GatewayBalances.cpp#L129
2025-01-22 09:48:13 +00:00
github-actions[bot]
f9d9879513 style: clang-tidy auto fixes (#1845)
Fixes #1844. Please review and commit clang-tidy fixes.

Co-authored-by: kuznetsss <15742918+kuznetsss@users.noreply.github.com>
2025-01-22 09:46:37 +00:00
cyan317
278f7b1b58 feat: Block clio if migration is blocking (#1834)
Add:
- Block server if migration is blocking
- Initialise the migration related table when server starts against
empty DB

Add MigrationInspectorInterface. server uses inspector to check the
migrators status.
2025-01-21 14:10:01 +00:00
nkramer44
fbedeff697 fix: Remove InvalidHotWallet Error from gateway_balances RPC handler (#1830)
Fixes #1825 by removing the check in the gateway_balances RPC handler
that returns the RpcInvalidHotWallet error code if one of the addresses
supplied in the request's `hotwallet` array does not have a trustline
with the `account` from the request.

As stated in the original ticket, this change fixes a discrepancy in
behavior between Clio and rippled, as rippled does not check for
trustline existence when handling gateway_balances RPCs

Co-authored-by: Sergey Kuznetsov <skuznetsov@ripple.com>
2025-01-21 12:17:54 +00:00
cyan317
f64d8ecb77 fix: Copyright format (#1835)
Aligned the copyright notice to match the format used in other files.
Updated Copyright (c) 2022-2024 to Copyright (c) 2024 to ensure
consistency.
2025-01-16 14:36:03 +00:00
Peter Chen
3e38ea9b48 fix: Add more constraints to config (#1831)
Log file size and rotation should also not allowed to be 0.
2025-01-15 10:56:50 -05:00
Sergey Kuznetsov
7834b63b55 fix: Check result of parsing config (#1829) 2025-01-14 11:55:05 -05:00
Sergey Kuznetsov
1460d590f1 chore: Commits from develop for 2.4.0-b2 (#1811) 2025-01-07 13:58:12 +00:00
161 changed files with 4044 additions and 751 deletions

View File

@@ -28,7 +28,6 @@ Below are some useful docs to learn more about Clio.
**For Developers**:
- [How to build Clio](./docs/build-clio.md)
- [Metrics and static analysis](./docs/metrics-and-static-analysis.md)
- [Coverage report](./docs/coverage-report.md)
**For Operators**:

View File

@@ -28,7 +28,7 @@ class Clio(ConanFile):
'protobuf/3.21.9',
'grpc/1.50.1',
'openssl/1.1.1u',
'xrpl/2.4.0-b1',
'xrpl/2.4.0-b3',
'zlib/1.3.1',
'libbacktrace/cci.20210118'
]

View File

@@ -4,12 +4,13 @@ This image contains an environment to build [Clio](https://github.com/XRPLF/clio
It is used in [Clio Github Actions](https://github.com/XRPLF/clio/actions) but can also be used to compile Clio locally.
The image is based on Ubuntu 20.04 and contains:
- clang 16
- clang 16.0.6
- gcc 12.3
- doxygen 1.10
- doxygen 1.12
- gh 2.40
- ccache 4.8.3
- conan
- ccache 4.10.2
- conan 1.62
- and some other useful tools
Conan is set up to build Clio without any additional steps. There are two preset conan profiles: `clang` and `gcc` to use corresponding compiler.
Conan is set up to build Clio without any additional steps. There are two preset conan profiles: `clang` and `gcc` to use corresponding compiler. By default conan is setup to use `gcc`.
Sanitizer builds for `ASAN`, `TSAN` and `UBSAN` are enabled via conan profiles for each of the supported compilers. These can be selected using the following pattern (all lowercase): `[compiler].[sanitizer]` (e.g. `--profile gcc.tsan`).

View File

@@ -0,0 +1,9 @@
include(clang)
[options]
boost:extra_b2_flags="cxxflags=\"-fsanitize=address\" linkflags=\"-fsanitize=address\""
boost:without_stacktrace=True
[env]
CFLAGS="-fsanitize=address"
CXXFLAGS="-fsanitize=address"
LDFLAGS="-fsanitize=address"

View File

@@ -0,0 +1,9 @@
include(clang)
[options]
boost:extra_b2_flags="cxxflags=\"-fsanitize=thread\" linkflags=\"-fsanitize=thread\""
boost:without_stacktrace=True
[env]
CFLAGS="-fsanitize=thread"
CXXFLAGS="-fsanitize=thread"
LDFLAGS="-fsanitize=thread"

View File

@@ -0,0 +1,9 @@
include(clang)
[options]
boost:extra_b2_flags="cxxflags=\"-fsanitize=undefined\" linkflags=\"-fsanitize=undefined\""
boost:without_stacktrace=True
[env]
CFLAGS="-fsanitize=undefined"
CXXFLAGS="-fsanitize=undefined"
LDFLAGS="-fsanitize=undefined"

9
docker/ci/conan/gcc.asan Normal file
View File

@@ -0,0 +1,9 @@
include(gcc)
[options]
boost:extra_b2_flags="cxxflags=\"-fsanitize=address\" linkflags=\"-fsanitize=address\""
boost:without_stacktrace=True
[env]
CFLAGS="-fsanitize=address"
CXXFLAGS="-fsanitize=address"
LDFLAGS="-fsanitize=address"

9
docker/ci/conan/gcc.tsan Normal file
View File

@@ -0,0 +1,9 @@
include(gcc)
[options]
boost:extra_b2_flags="cxxflags=\"-fsanitize=thread\" linkflags=\"-fsanitize=thread\""
boost:without_stacktrace=True
[env]
CFLAGS="-fsanitize=thread"
CXXFLAGS="-fsanitize=thread"
LDFLAGS="-fsanitize=thread"

View File

@@ -0,0 +1,9 @@
include(gcc)
[options]
boost:extra_b2_flags="cxxflags=\"-fsanitize=undefined\" linkflags=\"-fsanitize=undefined\""
boost:without_stacktrace=True
[env]
CFLAGS="-fsanitize=undefined"
CXXFLAGS="-fsanitize=undefined"
LDFLAGS="-fsanitize=undefined"

View File

@@ -98,3 +98,10 @@ RUN conan profile new clang --detect \
&& conan profile update "conf.tools.build:compiler_executables={\"c\": \"/usr/bin/clang-16\", \"cpp\": \"/usr/bin/clang++-16\"}" clang
RUN echo "include(gcc)" >> .conan/profiles/default
COPY conan/gcc.asan /root/.conan/profiles
COPY conan/gcc.tsan /root/.conan/profiles
COPY conan/gcc.ubsan /root/.conan/profiles
COPY conan/clang.asan /root/.conan/profiles
COPY conan/clang.tsan /root/.conan/profiles
COPY conan/clang.ubsan /root/.conan/profiles

View File

@@ -181,3 +181,20 @@ Sometimes, during development, you need to build against a custom version of `li
4. Build Clio as you would have before.
See [Building Clio](#building-clio) for details.
## Using `clang-tidy` for static analysis
The minimum [clang-tidy](https://clang.llvm.org/extra/clang-tidy/) version required is 19.0.
Clang-tidy can be run by Cmake when building the project. To achieve this, you just need to provide the option `-o lint=True` for the `conan install` command:
```sh
conan install .. --output-folder . --build missing --settings build_type=Release -o tests=True -o lint=True
```
By default Cmake will try to find `clang-tidy` automatically in your system.
To force Cmake to use your desired binary, set the `CLIO_CLANG_TIDY_BIN` environment variable to the path of the `clang-tidy` binary. For example:
```sh
export CLIO_CLANG_TIDY_BIN=/opt/homebrew/opt/llvm@19/bin/clang-tidy
```

View File

@@ -1,30 +0,0 @@
# Metrics and static analysis
## Prometheus metrics collection
Clio natively supports [Prometheus](https://prometheus.io/) metrics collection. It accepts Prometheus requests on the port configured in the `server` section of the config.
Prometheus metrics are enabled by default, and replies to `/metrics` are compressed. To disable compression, and have human readable metrics, add `"prometheus": { "enabled": true, "compress_reply": false }` to Clio's config.
To completely disable Prometheus metrics add `"prometheus": { "enabled": false }` to Clio's config.
It is important to know that Clio responds to Prometheus request only if they are admin requests. If you are using the admin password feature, the same password should be provided in the Authorization header of Prometheus requests.
You can find an example docker-compose file, with Prometheus and Grafana configs, in [examples/infrastructure](../docs/examples/infrastructure/).
## Using `clang-tidy` for static analysis
The minimum [clang-tidy](https://clang.llvm.org/extra/clang-tidy/) version required is 19.0.
Clang-tidy can be run by Cmake when building the project. To achieve this, you just need to provide the option `-o lint=True` for the `conan install` command:
```sh
conan install .. --output-folder . --build missing --settings build_type=Release -o tests=True -o lint=True
```
By default Cmake will try to find `clang-tidy` automatically in your system.
To force Cmake to use your desired binary, set the `CLIO_CLANG_TIDY_BIN` environment variable to the path of the `clang-tidy` binary. For example:
```sh
export CLIO_CLANG_TIDY_BIN=/opt/homebrew/opt/llvm@19/bin/clang-tidy
```

View File

@@ -80,3 +80,15 @@ Clio will fallback to hardcoded defaults when these values are not specified in
> [!TIP]
> See the [example-config.json](../docs/examples/config/example-config.json) for more details.
## Prometheus metrics collection
Clio natively supports [Prometheus](https://prometheus.io/) metrics collection. It accepts Prometheus requests on the port configured in the `server` section of the config.
Prometheus metrics are enabled by default, and replies to `/metrics` are compressed. To disable compression, and have human readable metrics, add `"prometheus": { "enabled": true, "compress_reply": false }` to Clio's config.
To completely disable Prometheus metrics add `"prometheus": { "enabled": false }` to Clio's config.
It is important to know that Clio responds to Prometheus request only if they are admin requests. If you are using the admin password feature, the same password should be provided in the Authorization header of Prometheus requests.
You can find an example docker-compose file, with Prometheus and Grafana configs, in [examples/infrastructure](../docs/examples/infrastructure/).

View File

@@ -1,4 +1,4 @@
add_library(clio_app)
target_sources(clio_app PRIVATE CliArgs.cpp ClioApplication.cpp WebHandlers.cpp)
target_sources(clio_app PRIVATE CliArgs.cpp ClioApplication.cpp Stopper.cpp WebHandlers.cpp)
target_link_libraries(clio_app PUBLIC clio_etl clio_etlng clio_feed clio_web clio_rpc clio_migration)

View File

@@ -19,6 +19,7 @@
#include "app/ClioApplication.hpp"
#include "app/Stopper.hpp"
#include "app/WebHandlers.hpp"
#include "data/AmendmentCenter.hpp"
#include "data/BackendFactory.hpp"
@@ -26,6 +27,7 @@
#include "etl/LoadBalancer.hpp"
#include "etl/NetworkValidatedLedgers.hpp"
#include "feed/SubscriptionManager.hpp"
#include "migration/MigrationInspectorFactory.hpp"
#include "rpc/Counters.hpp"
#include "rpc/RPCEngine.hpp"
#include "rpc/WorkQueue.hpp"
@@ -83,6 +85,7 @@ ClioApplication::ClioApplication(util::config::ClioConfigDefinition const& confi
{
LOG(util::LogService::info()) << "Clio version: " << util::build::getClioFullVersionString();
PrometheusService::init(config);
signalsHandler_.subscribeToStop([this]() { appStopper_.stop(); });
}
int
@@ -103,6 +106,16 @@ ClioApplication::run(bool const useNgWebServer)
// Interface to the database
auto backend = data::makeBackend(config_);
{
auto const migrationInspector = migration::makeMigrationInspector(config_, backend);
// Check if any migration is blocking Clio server starting.
if (migrationInspector->isBlockingClio() and backend->hardFetchLedgerRangeNoThrow()) {
LOG(util::LogService::error())
<< "Existing Migration is blocking Clio, Please complete the database migration first.";
return EXIT_FAILURE;
}
}
// Manages clients subscribed to streams
auto subscriptions = feed::SubscriptionManager::makeSubscriptionManager(config_, backend);
@@ -158,6 +171,10 @@ ClioApplication::run(bool const useNgWebServer)
return EXIT_FAILURE;
}
appStopper_.setOnStop(
Stopper::makeOnStopCallback(httpServer.value(), *balancer, *etl, *subscriptions, *backend, ioc)
);
// Blocks until stopped.
// When stopped, shared_ptrs fall out of scope
// Calls destructors on all resources, and destructs in order

View File

@@ -19,6 +19,7 @@
#pragma once
#include "app/Stopper.hpp"
#include "util/SignalsHandler.hpp"
#include "util/newconfig/ConfigDefinition.hpp"
@@ -30,6 +31,7 @@ namespace app {
class ClioApplication {
util::config::ClioConfigDefinition const& config_;
util::SignalsHandler signalsHandler_;
Stopper appStopper_;
public:
/**

52
src/app/Stopper.cpp Normal file
View File

@@ -0,0 +1,52 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "app/Stopper.hpp"
#include <boost/asio/spawn.hpp>
#include <functional>
#include <thread>
#include <utility>
namespace app {
Stopper::~Stopper()
{
if (worker_.joinable())
worker_.join();
}
void
Stopper::setOnStop(std::function<void(boost::asio::yield_context)> cb)
{
boost::asio::spawn(ctx_, std::move(cb));
}
void
Stopper::stop()
{
// Do nothing if worker_ is already running
if (worker_.joinable())
return;
worker_ = std::thread{[this]() { ctx_.run(); }};
}
} // namespace app

118
src/app/Stopper.hpp Normal file
View File

@@ -0,0 +1,118 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "data/BackendInterface.hpp"
#include "etl/ETLService.hpp"
#include "etl/LoadBalancer.hpp"
#include "feed/SubscriptionManagerInterface.hpp"
#include "util/CoroutineGroup.hpp"
#include "util/log/Logger.hpp"
#include "web/ng/Server.hpp"
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp>
#include <functional>
#include <thread>
namespace app {
/**
* @brief Application stopper class. On stop it will create a new thread to run all the shutdown tasks.
*/
class Stopper {
boost::asio::io_context ctx_;
std::thread worker_;
public:
/**
* @brief Destroy the Stopper object
*/
~Stopper();
/**
* @brief Set the callback to be called when the application is stopped.
*
* @param cb The callback to be called on application stop.
*/
void
setOnStop(std::function<void(boost::asio::yield_context)> cb);
/**
* @brief Stop the application and run the shutdown tasks.
*/
void
stop();
/**
* @brief Create a callback to be called on application stop.
*
* @param server The server to stop.
* @param balancer The load balancer to stop.
* @param etl The ETL service to stop.
* @param subscriptions The subscription manager to stop.
* @param backend The backend to stop.
* @param ioc The io_context to stop.
* @return The callback to be called on application stop.
*/
template <
web::ng::SomeServer ServerType,
etl::SomeLoadBalancer LoadBalancerType,
etl::SomeETLService ETLServiceType>
static std::function<void(boost::asio::yield_context)>
makeOnStopCallback(
ServerType& server,
LoadBalancerType& balancer,
ETLServiceType& etl,
feed::SubscriptionManagerInterface& subscriptions,
data::BackendInterface& backend,
boost::asio::io_context& ioc
)
{
return [&](boost::asio::yield_context yield) {
util::CoroutineGroup coroutineGroup{yield};
coroutineGroup.spawn(yield, [&server](auto innerYield) {
server.stop(innerYield);
LOG(util::LogService::info()) << "Server stopped";
});
coroutineGroup.spawn(yield, [&balancer](auto innerYield) {
balancer.stop(innerYield);
LOG(util::LogService::info()) << "LoadBalancer stopped";
});
coroutineGroup.asyncWait(yield);
etl.stop();
LOG(util::LogService::info()) << "ETL stopped";
subscriptions.stop();
LOG(util::LogService::info()) << "SubscriptionManager stopped";
backend.waitForWritesToFinish();
LOG(util::LogService::info()) << "Backend writes finished";
ioc.stop();
LOG(util::LogService::info()) << "io_context stopped";
};
}
};
} // namespace app

View File

@@ -35,7 +35,7 @@ namespace app {
* @return true if config values are all correct, false otherwise
*/
inline bool
verifyConfig(std::string_view configPath)
parseConfig(std::string_view configPath)
{
using namespace util::config;
@@ -54,4 +54,5 @@ verifyConfig(std::string_view configPath)
}
return true;
}
} // namespace app

View File

@@ -132,6 +132,8 @@ struct Amendments {
REGISTER(fixAMMv1_2);
REGISTER(AMMClawback);
REGISTER(Credentials);
REGISTER(DynamicNFT);
REGISTER(PermissionedDomains);
// Obsolete but supported by libxrpl
REGISTER(CryptoConditionsSuite);

View File

@@ -683,6 +683,12 @@ public:
bool
finishWrites(std::uint32_t ledgerSequence);
/**
* @brief Wait for all pending writes to finish.
*/
virtual void
waitForWritesToFinish() = 0;
/**
* @brief Mark the migration status of a migrator as Migrated in the database
*

View File

@@ -188,11 +188,16 @@ public:
return {txns, {}};
}
void
waitForWritesToFinish() override
{
executor_.sync();
}
bool
doFinishWrites() override
{
// wait for other threads to finish their writes
executor_.sync();
waitForWritesToFinish();
if (!range_) {
executor_.writeSync(schema_->updateLedgerRange, ledgerSequence_, false, ledgerSequence_);
@@ -619,7 +624,6 @@ public:
return seq;
}
LOG(log_.debug()) << "Could not fetch ledger object sequence - no rows";
} else {
LOG(log_.error()) << "Could not fetch ledger object sequence: " << res.error();
}
@@ -943,28 +947,35 @@ public:
statements.reserve(data.size() * 3);
for (NFTsData const& record : data) {
statements.push_back(
schema_->insertNFT.bind(record.tokenID, record.ledgerSequence, record.owner, record.isBurned)
);
if (!record.onlyUriChanged) {
statements.push_back(
schema_->insertNFT.bind(record.tokenID, record.ledgerSequence, record.owner, record.isBurned)
);
// If `uri` is set (and it can be set to an empty uri), we know this
// is a net-new NFT. That is, this NFT has not been seen before by
// us _OR_ it is in the extreme edge case of a re-minted NFT ID with
// the same NFT ID as an already-burned token. In this case, we need
// to record the URI and link to the issuer_nf_tokens table.
if (record.uri) {
statements.push_back(schema_->insertIssuerNFT.bind(
ripple::nft::getIssuer(record.tokenID),
static_cast<uint32_t>(ripple::nft::getTaxon(record.tokenID)),
record.tokenID
));
// If `uri` is set (and it can be set to an empty uri), we know this
// is a net-new NFT. That is, this NFT has not been seen before by
// us _OR_ it is in the extreme edge case of a re-minted NFT ID with
// the same NFT ID as an already-burned token. In this case, we need
// to record the URI and link to the issuer_nf_tokens table.
if (record.uri) {
statements.push_back(schema_->insertIssuerNFT.bind(
ripple::nft::getIssuer(record.tokenID),
static_cast<uint32_t>(ripple::nft::getTaxon(record.tokenID)),
record.tokenID
));
statements.push_back(
schema_->insertNFTURI.bind(record.tokenID, record.ledgerSequence, record.uri.value())
);
}
} else {
// only uri changed, we update the uri table only
statements.push_back(
schema_->insertNFTURI.bind(record.tokenID, record.ledgerSequence, record.uri.value())
);
}
}
executor_.write(std::move(statements));
executor_.writeEach(std::move(statements));
}
void

View File

@@ -107,6 +107,7 @@ struct NFTsData {
ripple::AccountID owner;
std::optional<ripple::Blob> uri;
bool isBurned = false;
bool onlyUriChanged = false; // Whether only the URI was changed
/**
* @brief Construct a new NFTsData object
@@ -170,6 +171,23 @@ struct NFTsData {
: tokenID(tokenID), ledgerSequence(ledgerSequence), owner(owner), uri(uri)
{
}
/**
* @brief Construct a new NFTsData object with only the URI changed
*
* @param tokenID The token ID
* @param meta The transaction metadata
* @param uri The new URI
*
*/
NFTsData(ripple::uint256 const& tokenID, ripple::TxMeta const& meta, ripple::Blob const& uri)
: tokenID(tokenID)
, ledgerSequence(meta.getLgrSeq())
, transactionIndex(meta.getIndex())
, uri(uri)
, onlyUriChanged(true)
{
}
};
/**

View File

@@ -23,6 +23,7 @@
#include "data/cassandra/Handle.hpp"
#include "data/cassandra/Types.hpp"
#include "data/cassandra/impl/RetryPolicy.hpp"
#include "util/Mutex.hpp"
#include "util/log/Logger.hpp"
#include <boost/asio.hpp>
@@ -64,8 +65,8 @@ class AsyncExecutor : public std::enable_shared_from_this<AsyncExecutor<Statemen
RetryCallbackType onRetry_;
// does not exist during initial construction, hence optional
std::optional<FutureWithCallbackType> future_;
std::mutex mtx_;
using OptionalFuture = std::optional<FutureWithCallbackType>;
util::Mutex<OptionalFuture> future_;
public:
/**
@@ -127,8 +128,8 @@ private:
self = nullptr; // explicitly decrement refcount
};
std::scoped_lock const lck{mtx_};
future_.emplace(handle.asyncExecute(data_, std::move(handler)));
auto future = future_.template lock<std::scoped_lock>();
future->emplace(handle.asyncExecute(data_, std::move(handler)));
}
};

View File

@@ -35,6 +35,7 @@
#include <boost/asio/spawn.hpp>
#include <boost/json/object.hpp>
#include <algorithm>
#include <atomic>
#include <chrono>
#include <condition_variable>
@@ -192,10 +193,24 @@ public:
template <typename... Args>
void
write(PreparedStatementType const& preparedStatement, Args&&... args)
{
auto statement = preparedStatement.bind(std::forward<Args>(args)...);
write(std::move(statement));
}
/**
* @brief Non-blocking query execution used for writing data.
*
* Retries forever with retry policy specified by @ref AsyncExecutor
*
* @param statement Statement to execute
* @throw DatabaseTimeout on timeout
*/
void
write(StatementType&& statement)
{
auto const startTime = std::chrono::steady_clock::now();
auto statement = preparedStatement.bind(std::forward<Args>(args)...);
incrementOutstandingRequestCount();
counters_->registerWriteStarted();
@@ -251,6 +266,21 @@ public:
});
}
/**
* @brief Non-blocking query execution used for writing data. Constrast with write, this method does not execute
* the statements in a batch.
*
* Retries forever with retry policy specified by @ref AsyncExecutor.
*
* @param statements Vector of statements to execute
* @throw DatabaseTimeout on timeout
*/
void
writeEach(std::vector<StatementType>&& statements)
{
std::ranges::for_each(std::move(statements), [this](auto& statement) { this->write(std::move(statement)); });
}
/**
* @brief Coroutine-based query execution used for reading data.
*

View File

@@ -130,7 +130,8 @@ public:
void
stop() noexcept
{
loader_->stop();
if (loader_ != nullptr)
loader_->stop();
}
/**
@@ -139,7 +140,8 @@ public:
void
wait() noexcept
{
loader_->wait();
if (loader_ != nullptr)
loader_->wait();
}
};

View File

@@ -42,6 +42,7 @@
#include <org/xrpl/rpc/v1/get_ledger.pb.h>
#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
#include <concepts>
#include <cstddef>
#include <cstdint>
#include <memory>
@@ -58,6 +59,16 @@ struct NFTsData;
*/
namespace etl {
/**
* @brief A tag class to help identify ETLService in templated code.
*/
struct ETLServiceTag {
virtual ~ETLServiceTag() = default;
};
template <typename T>
concept SomeETLService = std::derived_from<T, ETLServiceTag>;
/**
* @brief This class is responsible for continuously extracting data from a p2p node, and writing that data to the
* databases.
@@ -71,7 +82,7 @@ namespace etl {
* the others will fall back to monitoring/publishing. In this sense, this class dynamically transitions from monitoring
* to writing and from writing to monitoring, based on the activity of other processes running on different machines.
*/
class ETLService {
class ETLService : public ETLServiceTag {
// TODO: make these template parameters in ETLService
using LoadBalancerType = LoadBalancer;
using DataPipeType = etl::impl::ExtractionDataPipe<org::xrpl::rpc::v1::GetLedgerResponse>;
@@ -159,10 +170,20 @@ public:
/**
* @brief Stops components and joins worker thread.
*/
~ETLService()
~ETLService() override
{
LOG(log_.info()) << "onStop called";
LOG(log_.debug()) << "Stopping Reporting ETL";
if (not state_.isStopping)
stop();
}
/**
* @brief Stop the ETL service.
* @note This method blocks until the ETL service has stopped.
*/
void
stop()
{
LOG(log_.info()) << "Stop called";
state_.isStopping = true;
cacheLoader_.stop();

View File

@@ -26,6 +26,7 @@
#include "feed/SubscriptionManagerInterface.hpp"
#include "rpc/Errors.hpp"
#include "util/Assert.hpp"
#include "util/CoroutineGroup.hpp"
#include "util/Random.hpp"
#include "util/ResponseExpirationCache.hpp"
#include "util/log/Logger.hpp"
@@ -336,6 +337,16 @@ LoadBalancer::getETLState() noexcept
return etlState_;
}
void
LoadBalancer::stop(boost::asio::yield_context yield)
{
util::CoroutineGroup group{yield};
std::ranges::for_each(sources_, [&group, yield](auto& source) {
group.spawn(yield, [&source](boost::asio::yield_context innerYield) { source->stop(innerYield); });
});
group.asyncWait(yield);
}
void
LoadBalancer::chooseForwardingSource()
{

View File

@@ -41,6 +41,7 @@
#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
#include <chrono>
#include <concepts>
#include <cstdint>
#include <expected>
#include <memory>
@@ -51,6 +52,16 @@
namespace etl {
/**
* @brief A tag class to help identify LoadBalancer in templated code.
*/
struct LoadBalancerTag {
virtual ~LoadBalancerTag() = default;
};
template <typename T>
concept SomeLoadBalancer = std::derived_from<T, LoadBalancerTag>;
/**
* @brief This class is used to manage connections to transaction processing processes.
*
@@ -58,7 +69,7 @@ namespace etl {
* which ledgers have been validated by the network, and the range of ledgers each etl source has). This class also
* allows requests for ledger data to be load balanced across all possible ETL sources.
*/
class LoadBalancer {
class LoadBalancer : public LoadBalancerTag {
public:
using RawLedgerObjectType = org::xrpl::rpc::v1::RawLedgerObject;
using GetLedgerResponseType = org::xrpl::rpc::v1::GetLedgerResponse;
@@ -132,7 +143,7 @@ public:
SourceFactory sourceFactory = makeSource
);
~LoadBalancer();
~LoadBalancer() override;
/**
* @brief Load the initial ledger, writing data to the queue.
@@ -203,6 +214,15 @@ public:
std::optional<ETLState>
getETLState() noexcept;
/**
* @brief Stop the load balancer. This will stop all subscription sources.
* @note This function will asynchronously wait for all sources to stop.
*
* @param yield The coroutine context
*/
void
stop(boost::asio::yield_context yield);
private:
/**
* @brief Execute a function on a randomly selected source.

View File

@@ -47,6 +47,17 @@
namespace etl {
std::pair<std::vector<NFTTransactionsData>, std::optional<NFTsData>>
getNftokenModifyData(ripple::TxMeta const& txMeta, ripple::STTx const& sttx)
{
auto const tokenID = sttx.getFieldH256(ripple::sfNFTokenID);
// note: sfURI is optional, if it is absent, we will update the uri as empty string
return {
{NFTTransactionsData(sttx.getFieldH256(ripple::sfNFTokenID), txMeta, sttx.getTransactionID())},
NFTsData(tokenID, txMeta, sttx.getFieldVL(ripple::sfURI))
};
}
std::pair<std::vector<NFTTransactionsData>, std::optional<NFTsData>>
getNFTokenMintData(ripple::TxMeta const& txMeta, ripple::STTx const& sttx)
{
@@ -166,7 +177,7 @@ getNFTokenBurnData(ripple::TxMeta const& txMeta, ripple::STTx const& sttx)
node.peekAtField(ripple::sfPreviousFields).downcast<ripple::STObject>();
if (previousFields.isFieldPresent(ripple::sfNFTokens))
prevNFTs = previousFields.getFieldArray(ripple::sfNFTokens);
} else if (!prevNFTs && node.getFName() == ripple::sfDeletedNode) {
} else if (node.getFName() == ripple::sfDeletedNode) {
prevNFTs =
node.peekAtField(ripple::sfFinalFields).downcast<ripple::STObject>().getFieldArray(ripple::sfNFTokens);
}
@@ -336,6 +347,9 @@ getNFTDataFromTx(ripple::TxMeta const& txMeta, ripple::STTx const& sttx)
case ripple::TxType::ttNFTOKEN_CREATE_OFFER:
return getNFTokenCreateOfferData(txMeta, sttx);
case ripple::TxType::ttNFTOKEN_MODIFY:
return getNftokenModifyData(txMeta, sttx);
default:
return {{}, {}};
}

View File

@@ -33,6 +33,16 @@
namespace etl {
/**
* @brief Get the NFT URI change data from a NFToken Modify transaction
*
* @param txMeta Transaction metadata
* @param sttx The transaction
* @return NFT URI change data as a pair of transactions and optional NFTsData
*/
std::pair<std::vector<NFTTransactionsData>, std::optional<NFTsData>>
getNftokenModifyData(ripple::TxMeta const& txMeta, ripple::STTx const& sttx);
/**
* @brief Get the NFT Token mint data from a transaction
*

View File

@@ -65,6 +65,15 @@ public:
virtual void
run() = 0;
/**
* @brief Stop Source.
* @note This method will asynchronously wait for source to be stopped.
*
* @param yield The coroutine context.
*/
virtual void
stop(boost::asio::yield_context yield) = 0;
/**
* @brief Check if source is connected
*

View File

@@ -57,6 +57,7 @@ struct FormattedTransactionsData {
std::vector<NFTTransactionsData> nfTokenTxData;
std::vector<NFTsData> nfTokensData;
std::vector<MPTHolderData> mptHoldersData;
std::vector<NFTsData> nfTokenURIChanges;
};
namespace etl::impl {
@@ -111,6 +112,7 @@ public:
{
FormattedTransactionsData result;
std::vector<NFTsData> nfTokenURIChanges;
for (auto& txn : *(data.mutable_transactions_list()->mutable_transactions())) {
std::string* raw = txn.mutable_transaction_blob();
@@ -123,8 +125,15 @@ public:
auto const [nftTxs, maybeNFT] = getNFTDataFromTx(txMeta, sttx);
result.nfTokenTxData.insert(result.nfTokenTxData.end(), nftTxs.begin(), nftTxs.end());
if (maybeNFT)
result.nfTokensData.push_back(*maybeNFT);
// We need to unique the URI changes separately, in case the URI changes are discarded
if (maybeNFT) {
if (maybeNFT->onlyUriChanged) {
nfTokenURIChanges.push_back(*maybeNFT);
} else {
result.nfTokensData.push_back(*maybeNFT);
}
}
auto const maybeMPTHolder = getMPTHolderFromTx(txMeta, sttx);
if (maybeMPTHolder)
@@ -143,6 +152,10 @@ public:
}
result.nfTokensData = getUniqueNFTsDatas(result.nfTokensData);
nfTokenURIChanges = getUniqueNFTsDatas(nfTokenURIChanges);
// Put uri change at the end to ensure the uri not overwritten
result.nfTokensData.insert(result.nfTokensData.end(), nfTokenURIChanges.begin(), nfTokenURIChanges.end());
return result;
}

View File

@@ -102,6 +102,12 @@ public:
subscriptionSource_->run();
}
void
stop(boost::asio::yield_context yield) final
{
subscriptionSource_->stop(yield);
}
/**
* @brief Check if source is connected
*

View File

@@ -35,7 +35,6 @@
#include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/use_future.hpp>
#include <boost/beast/http/field.hpp>
#include <boost/json/object.hpp>
#include <boost/json/parse.hpp>
@@ -49,7 +48,6 @@
#include <cstdint>
#include <exception>
#include <expected>
#include <future>
#include <memory>
#include <optional>
#include <stdexcept>
@@ -92,15 +90,6 @@ SubscriptionSource::SubscriptionSource(
.setConnectionTimeout(wsTimeout_);
}
SubscriptionSource::~SubscriptionSource()
{
stop();
retry_.cancel();
if (runFuture_.valid())
runFuture_.wait();
}
void
SubscriptionSource::run()
{
@@ -157,59 +146,53 @@ SubscriptionSource::validatedRange() const
}
void
SubscriptionSource::stop()
SubscriptionSource::stop(boost::asio::yield_context yield)
{
stop_ = true;
stopHelper_.asyncWaitForStop(yield);
}
void
SubscriptionSource::subscribe()
{
runFuture_ = boost::asio::spawn(
strand_,
[this, _ = boost::asio::make_work_guard(strand_)](boost::asio::yield_context yield) {
auto connection = wsConnectionBuilder_.connect(yield);
if (not connection) {
handleError(connection.error(), yield);
return;
}
boost::asio::spawn(strand_, [this, _ = boost::asio::make_work_guard(strand_)](boost::asio::yield_context yield) {
if (auto connection = wsConnectionBuilder_.connect(yield); connection) {
wsConnection_ = std::move(connection).value();
} else {
handleError(connection.error(), yield);
return;
}
auto const& subscribeCommand = getSubscribeCommandJson();
auto const writeErrorOpt = wsConnection_->write(subscribeCommand, yield, wsTimeout_);
if (writeErrorOpt) {
handleError(writeErrorOpt.value(), yield);
auto const& subscribeCommand = getSubscribeCommandJson();
if (auto const writeErrorOpt = wsConnection_->write(subscribeCommand, yield, wsTimeout_); writeErrorOpt) {
handleError(writeErrorOpt.value(), yield);
return;
}
isConnected_ = true;
LOG(log_.info()) << "Connected";
onConnect_();
retry_.reset();
while (!stop_) {
auto const message = wsConnection_->read(yield, wsTimeout_);
if (not message) {
handleError(message.error(), yield);
return;
}
isConnected_ = true;
LOG(log_.info()) << "Connected";
onConnect_();
retry_.reset();
while (!stop_) {
auto const message = wsConnection_->read(yield, wsTimeout_);
if (not message) {
handleError(message.error(), yield);
return;
}
auto const handleErrorOpt = handleMessage(message.value());
if (handleErrorOpt) {
handleError(handleErrorOpt.value(), yield);
return;
}
if (auto const handleErrorOpt = handleMessage(message.value()); handleErrorOpt) {
handleError(handleErrorOpt.value(), yield);
return;
}
// Close the connection
handleError(
util::requests::RequestError{"Subscription source stopped", boost::asio::error::operation_aborted},
yield
);
},
boost::asio::use_future
);
}
// Close the connection
handleError(
util::requests::RequestError{"Subscription source stopped", boost::asio::error::operation_aborted}, yield
);
});
}
std::optional<util::requests::RequestError>
@@ -299,6 +282,8 @@ SubscriptionSource::handleError(util::requests::RequestError const& error, boost
logError(error);
if (not stop_) {
retry_.retry([this] { subscribe(); });
} else {
stopHelper_.readyToStop();
}
}

View File

@@ -24,6 +24,7 @@
#include "feed/SubscriptionManagerInterface.hpp"
#include "util/Mutex.hpp"
#include "util/Retry.hpp"
#include "util/StopHelper.hpp"
#include "util/log/Logger.hpp"
#include "util/prometheus/Gauge.hpp"
#include "util/requests/Types.hpp"
@@ -39,7 +40,6 @@
#include <chrono>
#include <cstdint>
#include <functional>
#include <future>
#include <memory>
#include <optional>
#include <string>
@@ -50,6 +50,7 @@ namespace etl::impl {
/**
* @brief This class is used to subscribe to a source of ledger data and forward it to the subscription manager.
* @note This class is safe to delete only if io_context is stopped.
*/
class SubscriptionSource {
public:
@@ -89,7 +90,7 @@ private:
std::reference_wrapper<util::prometheus::GaugeInt> lastMessageTimeSecondsSinceEpoch_;
std::future<void> runFuture_;
util::StopHelper stopHelper_;
static constexpr std::chrono::seconds kWS_TIMEOUT{30};
static constexpr std::chrono::seconds kRETRY_MAX_DELAY{30};
@@ -124,13 +125,6 @@ public:
std::chrono::steady_clock::duration const retryDelay = SubscriptionSource::kRETRY_DELAY
);
/**
* @brief Destroy the Subscription Source object
*
* @note This will block to wait for all the async operations to complete. io_context must be still running
*/
~SubscriptionSource();
/**
* @brief Run the source
*/
@@ -192,7 +186,7 @@ public:
* @brief Stop the source. The source will complete already scheduled operations but will not schedule new ones
*/
void
stop();
stop(boost::asio::yield_context yield);
private:
void

View File

@@ -2,7 +2,7 @@ add_library(clio_etlng)
target_sources(
clio_etlng PRIVATE impl/AmendmentBlockHandler.cpp impl/AsyncGrpcCall.cpp impl/Extraction.cpp impl/GrpcSource.cpp
impl/Loading.cpp
impl/Loading.cpp impl/TaskManager.cpp
)
target_link_libraries(clio_etlng PUBLIC clio_data)

View File

@@ -0,0 +1,49 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <chrono>
#include <cstdint>
#include <optional>
namespace etlng {
/**
* @brief The interface of a scheduler for the extraction proccess
*/
struct LedgerPublisherInterface {
virtual ~LedgerPublisherInterface() = default;
/**
* @brief Publish the ledger by its sequence number
*
* @param seq The sequence number of the ledger
* @param maxAttempts The maximum number of attempts to publish the ledger; no limit if nullopt
* @param attemptsDelay The delay between attempts
*/
virtual void
publish(
uint32_t seq,
std::optional<uint32_t> maxAttempts,
std::chrono::steady_clock::duration attemptsDelay = std::chrono::seconds{1}
) = 0;
};
} // namespace etlng

View File

@@ -0,0 +1,141 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "etlng/impl/TaskManager.hpp"
#include "etlng/ExtractorInterface.hpp"
#include "etlng/LoaderInterface.hpp"
#include "etlng/Models.hpp"
#include "etlng/SchedulerInterface.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include "util/async/AnyOperation.hpp"
#include "util/async/AnyStrand.hpp"
#include "util/log/Logger.hpp"
#include <chrono>
#include <cstddef>
#include <functional>
#include <ranges>
#include <thread>
#include <utility>
#include <vector>
namespace etlng::impl {
TaskManager::TaskManager(
util::async::AnyExecutionContext&& ctx,
std::reference_wrapper<SchedulerInterface> scheduler,
std::reference_wrapper<ExtractorInterface> extractor,
std::reference_wrapper<LoaderInterface> loader
)
: ctx_(std::move(ctx)), schedulers_(scheduler), extractor_(extractor), loader_(loader)
{
}
TaskManager::~TaskManager()
{
stop();
}
void
TaskManager::run(Settings settings)
{
static constexpr auto kQUEUE_SIZE_LIMIT = 2048uz;
auto schedulingStrand = ctx_.makeStrand();
PriorityQueue queue(ctx_.makeStrand(), kQUEUE_SIZE_LIMIT);
LOG(log_.debug()) << "Starting task manager...\n";
extractors_.reserve(settings.numExtractors);
for ([[maybe_unused]] auto _ : std::views::iota(0uz, settings.numExtractors))
extractors_.push_back(spawnExtractor(schedulingStrand, queue));
loaders_.reserve(settings.numLoaders);
for ([[maybe_unused]] auto _ : std::views::iota(0uz, settings.numLoaders))
loaders_.push_back(spawnLoader(queue));
wait();
LOG(log_.debug()) << "All finished in task manager..\n";
}
util::async::AnyOperation<void>
TaskManager::spawnExtractor(util::async::AnyStrand& strand, PriorityQueue& queue)
{
// TODO: these values may be extracted to config later and/or need to be fine-tuned on a realistic system
static constexpr auto kDELAY_BETWEEN_ATTEMPTS = std::chrono::milliseconds{100u};
static constexpr auto kDELAY_BETWEEN_ENQUEUE_ATTEMPTS = std::chrono::milliseconds{1u};
return strand.execute([this, &queue](auto stopRequested) {
while (not stopRequested) {
if (auto task = schedulers_.get().next(); task.has_value()) {
if (auto maybeBatch = extractor_.get().extractLedgerWithDiff(task->seq); maybeBatch.has_value()) {
LOG(log_.debug()) << "Adding data after extracting diff";
while (not queue.enqueue(*maybeBatch)) {
// TODO (https://github.com/XRPLF/clio/issues/1852)
std::this_thread::sleep_for(kDELAY_BETWEEN_ENQUEUE_ATTEMPTS);
if (stopRequested)
break;
}
} else {
// TODO: how do we signal to the loaders that it's time to shutdown? some special task?
break; // TODO: handle server shutdown or other node took over ETL
}
} else {
// TODO (https://github.com/XRPLF/clio/issues/1852)
std::this_thread::sleep_for(kDELAY_BETWEEN_ATTEMPTS);
}
}
});
}
util::async::AnyOperation<void>
TaskManager::spawnLoader(PriorityQueue& queue)
{
return ctx_.execute([this, &queue](auto stopRequested) {
while (not stopRequested) {
// TODO (https://github.com/XRPLF/clio/issues/66): does not tell the loader whether it's out of order or not
if (auto data = queue.dequeue(); data.has_value())
loader_.get().load(*data);
}
});
}
void
TaskManager::wait()
{
for (auto& extractor : extractors_)
extractor.wait();
for (auto& loader : loaders_)
loader.wait();
}
void
TaskManager::stop()
{
for (auto& extractor : extractors_)
extractor.abort();
for (auto& loader : loaders_)
loader.abort();
wait();
}
} // namespace etlng::impl

View File

@@ -0,0 +1,94 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "etlng/ExtractorInterface.hpp"
#include "etlng/LoaderInterface.hpp"
#include "etlng/Models.hpp"
#include "etlng/SchedulerInterface.hpp"
#include "util/StrandedPriorityQueue.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include "util/async/AnyOperation.hpp"
#include "util/async/AnyStrand.hpp"
#include "util/log/Logger.hpp"
#include <xrpl/protocol/TxFormats.h>
#include <cstddef>
#include <functional>
#include <vector>
namespace etlng::impl {
class TaskManager {
util::async::AnyExecutionContext ctx_;
std::reference_wrapper<SchedulerInterface> schedulers_;
std::reference_wrapper<ExtractorInterface> extractor_;
std::reference_wrapper<LoaderInterface> loader_;
std::vector<util::async::AnyOperation<void>> extractors_;
std::vector<util::async::AnyOperation<void>> loaders_;
util::Logger log_{"ETL"};
struct ReverseOrderComparator {
[[nodiscard]] bool
operator()(model::LedgerData const& lhs, model::LedgerData const& rhs) const noexcept
{
return lhs.seq > rhs.seq;
}
};
public:
struct Settings {
size_t numExtractors; /**< number of extraction tasks */
size_t numLoaders; /**< number of loading tasks */
};
// reverse order loading is needed (i.e. start with oldest seq in forward fill buffer)
using PriorityQueue = util::StrandedPriorityQueue<model::LedgerData, ReverseOrderComparator>;
TaskManager(
util::async::AnyExecutionContext&& ctx,
std::reference_wrapper<SchedulerInterface> scheduler,
std::reference_wrapper<ExtractorInterface> extractor,
std::reference_wrapper<LoaderInterface> loader
);
~TaskManager();
void
run(Settings settings);
void
stop();
private:
void
wait();
[[nodiscard]] util::async::AnyOperation<void>
spawnExtractor(util::async::AnyStrand& strand, PriorityQueue& queue);
[[nodiscard]] util::async::AnyOperation<void>
spawnLoader(PriorityQueue& queue);
};
} // namespace etlng::impl

View File

@@ -50,7 +50,7 @@ void
SubscriptionManager::pubBookChanges(
ripple::LedgerHeader const& lgrInfo,
std::vector<data::TransactionAndMetadata> const& transactions
) const
)
{
bookChangesFeed_.pub(lgrInfo, transactions);
}
@@ -111,7 +111,7 @@ SubscriptionManager::pubLedger(
ripple::Fees const& fees,
std::string const& ledgerRange,
std::uint32_t const txnCount
) const
)
{
ledgerFeed_.pub(lgrInfo, fees, ledgerRange, txnCount);
}
@@ -129,7 +129,7 @@ SubscriptionManager::unsubManifest(SubscriberSharedPtr const& subscriber)
}
void
SubscriptionManager::forwardManifest(boost::json::object const& manifestJson) const
SubscriptionManager::forwardManifest(boost::json::object const& manifestJson)
{
manifestFeed_.pub(manifestJson);
}
@@ -147,7 +147,7 @@ SubscriptionManager::unsubValidation(SubscriberSharedPtr const& subscriber)
}
void
SubscriptionManager::forwardValidation(boost::json::object const& validationJson) const
SubscriptionManager::forwardValidation(boost::json::object const& validationJson)
{
validationsFeed_.pub(validationJson);
}

View File

@@ -115,6 +115,15 @@ public:
* @brief Destructor of the SubscriptionManager object. It will block until all running jobs finished.
*/
~SubscriptionManager() override
{
stop();
}
/**
* @brief Stop the SubscriptionManager and wait for all jobs to finish.
*/
void
stop() override
{
ctx_.stop();
ctx_.join();
@@ -141,7 +150,7 @@ public:
*/
void
pubBookChanges(ripple::LedgerHeader const& lgrInfo, std::vector<data::TransactionAndMetadata> const& transactions)
const final;
final;
/**
* @brief Subscribe to the proposed transactions feed.
@@ -209,7 +218,7 @@ public:
ripple::Fees const& fees,
std::string const& ledgerRange,
std::uint32_t txnCount
) const final;
) final;
/**
* @brief Subscribe to the manifest feed.
@@ -230,7 +239,7 @@ public:
* @param manifestJson The manifest json to forward.
*/
void
forwardManifest(boost::json::object const& manifestJson) const final;
forwardManifest(boost::json::object const& manifestJson) final;
/**
* @brief Subscribe to the validation feed.
@@ -251,7 +260,7 @@ public:
* @param validationJson The validation feed json to forward.
*/
void
forwardValidation(boost::json::object const& validationJson) const final;
forwardValidation(boost::json::object const& validationJson) final;
/**
* @brief Subscribe to the transactions feed.

View File

@@ -45,6 +45,12 @@ class SubscriptionManagerInterface {
public:
virtual ~SubscriptionManagerInterface() = default;
/**
* @brief Stop the SubscriptionManager and wait for all jobs to finish.
*/
virtual void
stop() = 0;
/**
* @brief Subscribe to the book changes feed.
* @param subscriber
@@ -65,8 +71,10 @@ public:
* @param transactions The transactions in the current ledger.
*/
virtual void
pubBookChanges(ripple::LedgerHeader const& lgrInfo, std::vector<data::TransactionAndMetadata> const& transactions)
const = 0;
pubBookChanges(
ripple::LedgerHeader const& lgrInfo,
std::vector<data::TransactionAndMetadata> const& transactions
) = 0;
/**
* @brief Subscribe to the proposed transactions feed.
@@ -135,7 +143,7 @@ public:
ripple::Fees const& fees,
std::string const& ledgerRange,
std::uint32_t txnCount
) const = 0;
) = 0;
/**
* @brief Subscribe to the manifest feed.
@@ -156,7 +164,7 @@ public:
* @param manifestJson The manifest json to forward.
*/
virtual void
forwardManifest(boost::json::object const& manifestJson) const = 0;
forwardManifest(boost::json::object const& manifestJson) = 0;
/**
* @brief Subscribe to the validation feed.
@@ -177,7 +185,7 @@ public:
* @param validationJson The validation feed json to forward.
*/
virtual void
forwardValidation(boost::json::object const& validationJson) const = 0;
forwardValidation(boost::json::object const& validationJson) = 0;
/**
* @brief Subscribe to the transactions feed.

View File

@@ -48,7 +48,7 @@ struct BookChangesFeed : public SingleFeedBase {
* @param transactions The transactions that were included in the ledger.
*/
void
pub(ripple::LedgerHeader const& lgrInfo, std::vector<data::TransactionAndMetadata> const& transactions) const
pub(ripple::LedgerHeader const& lgrInfo, std::vector<data::TransactionAndMetadata> const& transactions)
{
SingleFeedBase::pub(boost::json::serialize(rpc::computeBookChanges(lgrInfo, transactions)));
}

View File

@@ -36,7 +36,7 @@ struct ForwardFeed : public SingleFeedBase {
* @brief Publishes the json object.
*/
void
pub(boost::json::object const& json) const
pub(boost::json::object const& json)
{
SingleFeedBase::pub(boost::json::serialize(json));
}

View File

@@ -94,7 +94,7 @@ LedgerFeed::pub(
ripple::Fees const& fees,
std::string const& ledgerRange,
std::uint32_t const txnCount
) const
)
{
SingleFeedBase::pub(boost::json::serialize(makeLedgerPubMessage(lgrInfo, fees, ledgerRange, txnCount)));
}

View File

@@ -76,7 +76,7 @@ public:
pub(ripple::LedgerHeader const& lgrInfo,
ripple::Fees const& fees,
std::string const& ledgerRange,
std::uint32_t txnCount) const;
std::uint32_t txnCount);
private:
static boost::json::object

View File

@@ -62,7 +62,7 @@ SingleFeedBase::unsub(SubscriberSharedPtr const& subscriber)
}
void
SingleFeedBase::pub(std::string msg) const
SingleFeedBase::pub(std::string msg)
{
[[maybe_unused]] auto task = strand_.execute([this, msg = std::move(msg)]() {
auto const msgPtr = std::make_shared<std::string>(msg);

View File

@@ -73,7 +73,7 @@ public:
* @param msg The message.
*/
void
pub(std::string msg) const;
pub(std::string msg);
/**
* @brief Get the count of subscribers.

View File

@@ -19,6 +19,8 @@
#pragma once
#include "util/Mutex.hpp"
#include <boost/signals2.hpp>
#include <boost/signals2/connection.hpp>
#include <boost/signals2/variadic_signal.hpp>
@@ -45,8 +47,8 @@ class TrackableSignal {
// map of connection and signal connection, key is the pointer of the connection object
// allow disconnect to be called in the destructor of the connection
std::unordered_map<ConnectionPtr, boost::signals2::connection> connections_;
mutable std::mutex mutex_;
using ConnectionsMap = std::unordered_map<ConnectionPtr, boost::signals2::connection>;
util::Mutex<ConnectionsMap> connections_;
using SignalType = boost::signals2::signal<void(Args...)>;
SignalType signal_;
@@ -64,8 +66,8 @@ public:
bool
connectTrackableSlot(ConnectionSharedPtr const& trackable, std::function<void(Args...)> slot)
{
std::scoped_lock const lk(mutex_);
if (connections_.contains(trackable.get())) {
auto connections = connections_.template lock<std::scoped_lock>();
if (connections->contains(trackable.get())) {
return false;
}
@@ -73,7 +75,7 @@ public:
// the trackable's destructor. However, the trackable can not be destroied when the slot is being called
// either. track_foreign will hold a weak_ptr to the connection, which makes sure the connection is valid when
// the slot is called.
connections_.emplace(
connections->emplace(
trackable.get(), signal_.connect(typename SignalType::slot_type(slot).track_foreign(trackable))
);
return true;
@@ -89,10 +91,9 @@ public:
bool
disconnect(ConnectionPtr trackablePtr)
{
std::scoped_lock const lk(mutex_);
if (connections_.contains(trackablePtr)) {
connections_[trackablePtr].disconnect();
connections_.erase(trackablePtr);
if (auto connections = connections_.template lock<std::scoped_lock>(); connections->contains(trackablePtr)) {
connections->operator[](trackablePtr).disconnect();
connections->erase(trackablePtr);
return true;
}
return false;
@@ -115,8 +116,7 @@ public:
std::size_t
count() const
{
std::scoped_lock const lk(mutex_);
return connections_.size();
return connections_.template lock<std::scoped_lock>()->size();
}
};
} // namespace feed::impl

View File

@@ -20,6 +20,7 @@
#pragma once
#include "feed/impl/TrackableSignal.hpp"
#include "util/Mutex.hpp"
#include <boost/signals2.hpp>
@@ -49,8 +50,8 @@ class TrackableSignalMap {
using ConnectionPtr = Session*;
using ConnectionSharedPtr = std::shared_ptr<Session>;
mutable std::mutex mutex_;
std::unordered_map<Key, TrackableSignal<Session, Args...>> signalsMap_;
using SignalsMap = std::unordered_map<Key, TrackableSignal<Session, Args...>>;
util::Mutex<SignalsMap> signalsMap_;
public:
/**
@@ -66,8 +67,8 @@ public:
bool
connectTrackableSlot(ConnectionSharedPtr const& trackable, Key const& key, std::function<void(Args...)> slot)
{
std::scoped_lock const lk(mutex_);
return signalsMap_[key].connectTrackableSlot(trackable, slot);
auto map = signalsMap_.template lock<std::scoped_lock>();
return map->operator[](key).connectTrackableSlot(trackable, slot);
}
/**
@@ -80,14 +81,14 @@ public:
bool
disconnect(ConnectionPtr trackablePtr, Key const& key)
{
std::scoped_lock const lk(mutex_);
if (!signalsMap_.contains(key))
auto map = signalsMap_.template lock<std::scoped_lock>();
if (!map->contains(key))
return false;
auto const disconnected = signalsMap_[key].disconnect(trackablePtr);
auto const disconnected = map->operator[](key).disconnect(trackablePtr);
// clean the map if there is no connection left.
if (disconnected && signalsMap_[key].count() == 0)
signalsMap_.erase(key);
if (disconnected && map->operator[](key).count() == 0)
map->erase(key);
return disconnected;
}
@@ -101,9 +102,9 @@ public:
void
emit(Key const& key, Args const&... args)
{
std::scoped_lock const lk(mutex_);
if (signalsMap_.contains(key))
signalsMap_[key].emit(args...);
auto map = signalsMap_.template lock<std::scoped_lock>();
if (map->contains(key))
map->operator[](key).emit(args...);
}
};
} // namespace feed::impl

View File

@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022-2024, the clio developers.
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
@@ -41,14 +41,15 @@ try {
return action.apply(
[](app::CliArgs::Action::Exit const& exit) { return exit.exitCode; },
[](app::CliArgs::Action::VerifyConfig const& verify) {
if (app::verifyConfig(verify.configPath)) {
std::cout << "Config " << verify.configPath << " is correct" << "\n";
if (app::parseConfig(verify.configPath)) {
std::cout << "Config " << verify.configPath << " is correct"
<< "\n";
return EXIT_SUCCESS;
}
return EXIT_FAILURE;
},
[](app::CliArgs::Action::Run const& run) {
if (app::verifyConfig(verify.configPath))
if (not app::parseConfig(run.configPath))
return EXIT_FAILURE;
util::LogService::init(gClioConfig);
@@ -56,7 +57,7 @@ try {
return clio.run(run.useNgWebServer);
},
[](app::CliArgs::Action::Migrate const& migrate) {
if (app::verifyConfig(verify.configPath))
if (not app::parseConfig(migrate.configPath))
return EXIT_FAILURE;
util::LogService::init(gClioConfig);

View File

@@ -5,4 +5,4 @@ target_sources(
cassandra/impl/ObjectsAdapter.cpp cassandra/impl/TransactionsAdapter.cpp
)
target_link_libraries(clio_migration PRIVATE clio_util clio_etl)
target_link_libraries(clio_migration PRIVATE clio_util clio_data)

View File

@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022-2024, the clio developers.
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above

View File

@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022-2024, the clio developers.
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above

View File

@@ -0,0 +1,65 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "data/BackendInterface.hpp"
#include "migration/MigrationInspectorInterface.hpp"
#include "migration/MigratiorStatus.hpp"
#include "migration/cassandra/CassandraMigrationManager.hpp"
#include "util/Assert.hpp"
#include "util/log/Logger.hpp"
#include "util/newconfig/ConfigDefinition.hpp"
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/predicate.hpp>
#include <memory>
#include <utility>
namespace migration {
/**
* @brief A factory function that creates migration inspector instance and initializes the migration table if needed.
*
* @param config The config.
* @param backend The backend instance. It should be initialized before calling this function.
* @return A shared_ptr<MigrationInspectorInterface> instance
*/
inline std::shared_ptr<MigrationInspectorInterface>
makeMigrationInspector(
util::config::ClioConfigDefinition const& config,
std::shared_ptr<BackendInterface> const& backend
)
{
ASSERT(backend != nullptr, "Backend is not initialized");
auto inspector = std::make_shared<migration::cassandra::CassandraMigrationInspector>(backend);
// Database is empty, we need to initialize the migration table if it is a writeable backend
if (not config.get<bool>("read_only") and not backend->hardFetchLedgerRangeNoThrow()) {
migration::MigratorStatus const migrated(migration::MigratorStatus::Migrated);
for (auto const& name : inspector->allMigratorsNames()) {
backend->writeMigratorStatus(name, migrated.toString());
}
}
return inspector;
}
} // namespace migration

View File

@@ -0,0 +1,79 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "migration/MigratiorStatus.hpp"
#include <string>
#include <tuple>
#include <vector>
namespace migration {
/**
* @brief The interface for the migration inspector.The Clio server application will use this interface to inspect
* the migration status.
*/
struct MigrationInspectorInterface {
virtual ~MigrationInspectorInterface() = default;
/**
* @brief Get the status of all the migrators
* @return A vector of tuple, the first element is the migrator's name, the second element is the status of the
*/
virtual std::vector<std::tuple<std::string, MigratorStatus>>
allMigratorsStatusPairs() const = 0;
/**
* @brief Get all registered migrators' names
*
* @return A vector of migrators' names
*/
virtual std::vector<std::string>
allMigratorsNames() const = 0;
/**
* @brief Get the status of a migrator by its name
*
* @param name The migrator's name
* @return The status of the migrator
*/
virtual MigratorStatus
getMigratorStatusByName(std::string const& name) const = 0;
/**
* @brief Get the description of a migrator by its name
*
* @param name The migrator's name
* @return The description of the migrator
*/
virtual std::string
getMigratorDescriptionByName(std::string const& name) const = 0;
/**
* @brief Return if Clio server is blocked
*
* @return True if Clio server is blocked by migration, false otherwise
*/
virtual bool
isBlockingClio() const = 0;
};
} // namespace migration

View File

@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022-2024, the clio developers.
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
@@ -19,59 +19,23 @@
#pragma once
#include "migration/MigratiorStatus.hpp"
#include "migration/MigrationInspectorInterface.hpp"
#include <string>
#include <tuple>
#include <vector>
namespace migration {
/**
* @brief The interface for the migration manager. This interface is tend to be implemented for specific database. The
* application layer will use this interface to run the migrations.
* @brief The interface for the migration manager. The migration application layer will use this interface to run the
* migrations. Unlike the MigrationInspectorInterface which only provides the status of migration, this interface
* contains the acutal migration running method.
*/
struct MigrationManagerInterface {
virtual ~MigrationManagerInterface() = default;
struct MigrationManagerInterface : virtual public MigrationInspectorInterface {
/**
* @brief Run the the migration according to the given migrator's name
*/
virtual void
runMigration(std::string const&) = 0;
/**
* @brief Get the status of all the migrators
* @return A vector of tuple, the first element is the migrator's name, the second element is the status of the
*/
virtual std::vector<std::tuple<std::string, MigratorStatus>>
allMigratorsStatusPairs() const = 0;
/**
* @brief Get all registered migrators' names
*
* @return A vector of migrators' names
*/
virtual std::vector<std::string>
allMigratorsNames() const = 0;
/**
* @brief Get the status of a migrator by its name
*
* @param name The migrator's name
* @return The status of the migrator
*/
virtual MigratorStatus
getMigratorStatusByName(std::string const& name) const = 0;
/**
* @brief Get the description of a migrator by its name
*
* @param name The migrator's name
* @return The description of the migrator
*/
virtual std::string
getMigratorDescriptionByName(std::string const& name) const = 0;
};
} // namespace migration

View File

@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022-2024, the clio developers.
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above

View File

@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022-2024, the clio developers.
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above

View File

@@ -40,9 +40,11 @@ A migrator satisfies the `MigratorSpec`(impl/Spec.hpp) concept.
It contains:
- A `name` which will be used to identify the migrator. User will refer this migrator in command-line tool by this name. The name needs to be different with other migrators, otherwise a compilation error will be raised.
- A `kNAME` which will be used to identify the migrator. User will refer this migrator in command-line tool by this name. The name needs to be different with other migrators, otherwise a compilation error will be raised.
- A `description` which is the detail information of the migrator.
- A `kDESCRIPTION` which is the detail information of the migrator.
- An optional `kCAN_BLOCK_CLIO` which indicates whether the migrator can block the Clio server. If it's absent, the migrator can't block server. If there is a blocking migrator not completed, the Clio server will fail to start.
- A static function `runMigration`, it will be called when user run `--migrate name`. It accepts two parameters: backend, which provides the DB operations interface, and cfg, which provides migration-related configuration. Each migrator can have its own configuration under `.migration` session.
@@ -65,8 +67,8 @@ Most indexes are based on either ledger states or transactions. We provide the `
If you need to do full scan against other table, you can follow below steps:
- Describe the table which needs full scan in a struct. It has to satisfy the `TableSpec`(cassandra/Spec.hpp) concept, containing static member:
- Tuple type `Row`, it's the type of each field in a row. The order of types should match what database will return in a row. Key types should come first, followed by other field types sorted in alphabetical order.
- `PARTITION_KEY`, it's the name of the partition key of the table.
- `TABLE_NAME`
- `kPARTITION_KEY`, it's the name of the partition key of the table.
- `kTABLE_NAME`
- Inherent from `FullTableScannerAdapterBase`.
- Implement `onRowRead`, its parameter is the `Row` we defined. It's the callback function when a row is read.

View File

@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022-2024, the clio developers.
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above

View File

@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022-2024, the clio developers.
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
@@ -19,20 +19,32 @@
#pragma once
#include "data/BackendInterface.hpp"
#include "migration/cassandra/CassandraMigrationBackend.hpp"
#include "migration/impl/MigrationInspectorBase.hpp"
#include "migration/impl/MigrationManagerBase.hpp"
#include "migration/impl/MigratorsRegister.hpp"
namespace migration::cassandra {
namespace {
// Register migrators here
// MigratorsRegister<BackendType, ExampleMigrator>
template <typename BackendType>
using CassandraSupportedMigrators = migration::impl::MigratorsRegister<BackendType>;
// Register with MigrationBackend which proceeds the migration
using MigrationProcesser = CassandraSupportedMigrators<CassandraMigrationBackend>;
// Instantiates with the backend which supports actual migration running
using MigrationProcesser = CassandraSupportedMigrators<migration::cassandra::CassandraMigrationBackend>;
// Instantiates with backend interface, it doesn't support actual migration. But it can be used to inspect the migrators
// status
using MigrationQuerier = CassandraSupportedMigrators<data::BackendInterface>;
} // namespace
namespace migration::cassandra {
using CassandraMigrationInspector = migration::impl::MigrationInspectorBase<MigrationQuerier>;
// The Cassandra migration manager
using CassandraMigrationManager = migration::impl::MigrationManagerBase<MigrationProcesser>;
} // namespace migration::cassandra

View File

@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022-2024, the clio developers.
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above

View File

@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022-2024, the clio developers.
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above

View File

@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022-2024, the clio developers.
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above

View File

@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022-2024, the clio developers.
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above

View File

@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022-2024, the clio developers.
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above

View File

@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022-2024, the clio developers.
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above

View File

@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022-2024, the clio developers.
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above

View File

@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022-2024, the clio developers.
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above

View File

@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022-2024, the clio developers.
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above

View File

@@ -0,0 +1,121 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "migration/MigrationInspectorInterface.hpp"
#include "migration/MigratiorStatus.hpp"
#include <memory>
#include <ranges>
#include <string>
#include <tuple>
#include <vector>
namespace migration::impl {
/**
* @brief The migration inspector implementation for Cassandra. It will report the migration status for Cassandra
* database.
*
* @tparam SupportedMigrators The migrators resgister that contains all the migrators
*/
template <typename SupportedMigrators>
class MigrationInspectorBase : virtual public MigrationInspectorInterface {
protected:
SupportedMigrators migrators_;
public:
/**
* @brief Construct a new Cassandra Migration Inspector object
*
* @param backend The backend of the Cassandra database
*/
explicit MigrationInspectorBase(std::shared_ptr<typename SupportedMigrators::BackendType> backend)
: migrators_{std::move(backend)}
{
}
/**
* @brief Get the status of all the migrators
*
* @return A vector of tuple, the first element is the migrator's name, the second element is the status of the
* migrator
*/
std::vector<std::tuple<std::string, MigratorStatus>>
allMigratorsStatusPairs() const override
{
return migrators_.getMigratorsStatus();
}
/**
* @brief Get the status of a migrator by its name
*
* @param name The name of the migrator
* @return The status of the migrator
*/
MigratorStatus
getMigratorStatusByName(std::string const& name) const override
{
return migrators_.getMigratorStatus(name);
}
/**
* @brief Get all registered migrators' names
*
* @return A vector of string, the names of all the migrators
*/
std::vector<std::string>
allMigratorsNames() const override
{
auto const names = migrators_.getMigratorNames();
return std::vector<std::string>{names.begin(), names.end()};
}
/**
* @brief Get the description of a migrator by its name
*
* @param name The name of the migrator
* @return The description of the migrator
*/
std::string
getMigratorDescriptionByName(std::string const& name) const override
{
return migrators_.getMigratorDescription(name);
}
/**
* @brief Return if there is uncomplete migrator blocking the server
*
* @return True if server is blocked, false otherwise
*/
bool
isBlockingClio() const override
{
return std::ranges::any_of(migrators_.getMigratorNames(), [&](auto const& migrator) {
if (auto canBlock = migrators_.canMigratorBlockClio(migrator); canBlock.has_value() and *canBlock and
migrators_.getMigratorStatus(std::string(migrator)) == MigratorStatus::Status::NotMigrated) {
return true;
}
return false;
});
}
};
} // namespace migration::impl

View File

@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022-2024, the clio developers.
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
@@ -20,14 +20,12 @@
#pragma once
#include "migration/MigrationManagerInterface.hpp"
#include "migration/MigratiorStatus.hpp"
#include "migration/impl/MigrationInspectorBase.hpp"
#include "util/newconfig/ObjectView.hpp"
#include <memory>
#include <string>
#include <tuple>
#include <utility>
#include <vector>
namespace migration::impl {
@@ -38,8 +36,7 @@ namespace migration::impl {
* @tparam SupportedMigrators The migrators resgister that contains all the migrators
*/
template <typename SupportedMigrators>
class MigrationManagerBase : public MigrationManagerInterface {
SupportedMigrators migrators_;
class MigrationManagerBase : public MigrationManagerInterface, public MigrationInspectorBase<SupportedMigrators> {
// contains only migration related settings
util::config::ObjectView config_;
@@ -54,7 +51,7 @@ public:
std::shared_ptr<typename SupportedMigrators::BackendType> backend,
util::config::ObjectView config
)
: migrators_{backend}, config_{std::move(config)}
: MigrationInspectorBase<SupportedMigrators>{std::move(backend)}, config_{std::move(config)}
{
}
@@ -66,55 +63,7 @@ public:
void
runMigration(std::string const& name) override
{
migrators_.runMigrator(name, config_);
}
/**
* @brief Get the status of all the migrators
*
* @return A vector of tuple, the first element is the migrator's name, the second element is the status of the
* migrator
*/
std::vector<std::tuple<std::string, MigratorStatus>>
allMigratorsStatusPairs() const override
{
return migrators_.getMigratorsStatus();
}
/**
* @brief Get the status of a migrator by its name
*
* @param name The name of the migrator
* @return The status of the migrator
*/
MigratorStatus
getMigratorStatusByName(std::string const& name) const override
{
return migrators_.getMigratorStatus(name);
}
/**
* @brief Get all registered migrators' names
*
* @return A vector of string, the names of all the migrators
*/
std::vector<std::string>
allMigratorsNames() const override
{
auto const names = migrators_.getMigratorNames();
return std::vector<std::string>{names.begin(), names.end()};
}
/**
* @brief Get the description of a migrator by its name
*
* @param name The name of the migrator
* @return The description of the migrator
*/
std::string
getMigratorDescriptionByName(std::string const& name) const override
{
return migrators_.getMigratorDescription(name);
this->migrators_.runMigrator(name, config_);
}
};

View File

@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022-2024, the clio developers.
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above

View File

@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022-2024, the clio developers.
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above

View File

@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022-2024, the clio developers.
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
@@ -22,6 +22,7 @@
#include "data/BackendInterface.hpp"
#include "migration/MigratiorStatus.hpp"
#include "migration/impl/Spec.hpp"
#include "util/Assert.hpp"
#include "util/Concepts.hpp"
#include "util/log/Logger.hpp"
#include "util/newconfig/ObjectView.hpp"
@@ -30,10 +31,12 @@
#include <array>
#include <iterator>
#include <memory>
#include <optional>
#include <ranges>
#include <string>
#include <string_view>
#include <tuple>
#include <utility>
#include <vector>
namespace migration::impl {
@@ -47,6 +50,11 @@ concept MigrationBackend = requires { requires std::same_as<typename MigratorTyp
template <typename Backend, typename... MigratorType>
concept BackendMatchAllMigrators = (MigrationBackend<Backend, MigratorType> && ...);
template <typename T>
concept HasCanBlockClio = requires(T t) {
{ t.kCAN_BLOCK_CLIO };
};
/**
*@brief The register of migrators. It will dispatch the migration to the corresponding migrator. It also
*hold the shared pointer of backend, which is used by the migrators.
@@ -81,6 +89,23 @@ class MigratorsRegister {
return (T::kNAME == targetName) ? T::kDESCRIPTION : "";
}
template <typename First, typename... Rest>
static constexpr bool
canBlockClioHelper(std::string_view targetName)
{
if (targetName == First::kNAME) {
if constexpr (HasCanBlockClio<First>) {
return First::kCAN_BLOCK_CLIO;
}
return false;
}
if constexpr (sizeof...(Rest) > 0) {
return canBlockClioHelper<Rest...>(targetName);
}
ASSERT(false, "The migrator name is not found");
std::unreachable();
}
public:
/**
* @brief The backend type which is used by the migrators
@@ -179,6 +204,27 @@ public:
return result.empty() ? "No Description" : result;
}
}
/**
* @brief Return if the given migrator can block Clio server
*
* @param name The migrator's name
* @return std::nullopt if the migrator name is not found, or a boolean value indicating whether the migrator is
* blocking Clio server.
*/
std::optional<bool>
canMigratorBlockClio(std::string_view name) const
{
if constexpr (sizeof...(MigratorType) == 0) {
return std::nullopt;
} else {
auto const migratiors = getMigratorNames();
if (std::ranges::find(migratiors, name) == migratiors.end())
return std::nullopt;
return canBlockClioHelper<MigratorType...>(name);
}
}
};
} // namespace migration::impl

View File

@@ -2,7 +2,7 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022-2024, the clio developers.
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above

View File

@@ -79,7 +79,6 @@ getErrorInfo(ClioError code)
{.code = ClioError::RpcMalformedRequest, .error = "malformedRequest", .message = "Malformed request."},
{.code = ClioError::RpcMalformedOwner, .error = "malformedOwner", .message = "Malformed owner."},
{.code = ClioError::RpcMalformedAddress, .error = "malformedAddress", .message = "Malformed address."},
{.code = ClioError::RpcInvalidHotWallet, .error = "invalidHotWallet", .message = "Invalid hot wallet."},
{.code = ClioError::RpcUnknownOption, .error = "unknownOption", .message = "Unknown option."},
{.code = ClioError::RpcFieldNotFoundTransaction,
.error = "fieldNotFoundTransaction",
@@ -90,6 +89,7 @@ getErrorInfo(ClioError code)
{.code = ClioError::RpcMalformedAuthorizedCredentials,
.error = "malformedAuthorizedCredentials",
.message = "Malformed authorized credentials."},
// special system errors
{.code = ClioError::RpcInvalidApiVersion, .error = JS(invalid_API_version), .message = "Invalid API version."},
{.code = ClioError::RpcCommandIsMissing,

View File

@@ -39,7 +39,6 @@ enum class ClioError {
RpcMalformedRequest = 5001,
RpcMalformedOwner = 5002,
RpcMalformedAddress = 5003,
RpcInvalidHotWallet = 5004,
RpcUnknownOption = 5005,
RpcFieldNotFoundTransaction = 5006,
RpcMalformedOracleDocumentId = 5007,

View File

@@ -25,6 +25,7 @@
#include "rpc/common/JsonBool.hpp"
#include "rpc/common/MetaProcessors.hpp"
#include "rpc/common/Modifiers.hpp"
#include "rpc/common/Specs.hpp"
#include "rpc/common/Types.hpp"
#include "rpc/common/Validators.hpp"
#include "util/TxUtils.hpp"
@@ -39,7 +40,6 @@
#include <xrpl/protocol/jss.h>
#include <cstdint>
#include <limits>
#include <memory>
#include <optional>
#include <string>
@@ -57,8 +57,8 @@ class AccountTxHandler {
std::shared_ptr<BackendInterface> sharedPtrBackend_;
public:
// no max limit
static constexpr auto kLIMIT_MIN = 1;
static constexpr auto kLIMIT_MAX = 1000;
static constexpr auto kLIMIT_DEFAULT = 200;
/**
@@ -133,7 +133,7 @@ public:
{JS(limit),
validation::Type<uint32_t>{},
validation::Min(1u),
modifiers::Clamp<int32_t>{kLIMIT_MIN, std::numeric_limits<int32_t>::max()}},
modifiers::Clamp<int32_t>{kLIMIT_MIN, kLIMIT_MAX}},
{JS(marker),
meta::WithCustomError{
validation::Type<boost::json::object>{},

View File

@@ -145,10 +145,6 @@ GatewayBalancesHandler::process(GatewayBalancesHandler::Input input, Context con
if (auto status = std::get_if<Status>(&ret))
return Error{*status};
auto inHotbalances = [&](auto const& hw) { return output.hotBalances.contains(hw); };
if (not std::ranges::all_of(input.hotWallets, inHotbalances))
return Error{Status{ClioError::RpcInvalidHotWallet}};
output.accountID = input.account;
output.ledgerHash = ripple::strHex(lgrInfo.hash);
output.ledgerIndex = lgrInfo.seq;

View File

@@ -108,44 +108,51 @@ public:
static RpcSpecConstRef
spec([[maybe_unused]] uint32_t apiVersion)
{
static auto const kHOT_WALLET_VALIDATOR =
validation::CustomValidator{[](boost::json::value const& value, std::string_view key) -> MaybeError {
if (!value.is_string() && !value.is_array())
return Error{Status{RippledError::rpcINVALID_PARAMS, std::string(key) + "NotStringOrArray"}};
auto const getHotWalletValidator = [](RippledError errCode) {
return validation::CustomValidator{
[errCode](boost::json::value const& value, std::string_view key) -> MaybeError {
if (!value.is_string() && !value.is_array())
return Error{Status{errCode, std::string(key) + "NotStringOrArray"}};
// wallet needs to be an valid accountID or public key
auto const wallets = value.is_array() ? value.as_array() : boost::json::array{value};
auto const getAccountID = [](auto const& j) -> std::optional<ripple::AccountID> {
if (j.is_string()) {
auto const pk = util::parseBase58Wrapper<ripple::PublicKey>(
ripple::TokenType::AccountPublic, boost::json::value_to<std::string>(j)
);
// wallet needs to be an valid accountID or public key
auto const wallets = value.is_array() ? value.as_array() : boost::json::array{value};
auto const getAccountID = [](auto const& j) -> std::optional<ripple::AccountID> {
if (j.is_string()) {
auto const pk = util::parseBase58Wrapper<ripple::PublicKey>(
ripple::TokenType::AccountPublic, boost::json::value_to<std::string>(j)
);
if (pk)
return ripple::calcAccountID(*pk);
if (pk)
return ripple::calcAccountID(*pk);
return util::parseBase58Wrapper<ripple::AccountID>(boost::json::value_to<std::string>(j));
return util::parseBase58Wrapper<ripple::AccountID>(boost::json::value_to<std::string>(j));
}
return {};
};
for (auto const& wallet : wallets) {
if (!getAccountID(wallet))
return Error{Status{errCode, std::string(key) + "Malformed"}};
}
return {};
};
for (auto const& wallet : wallets) {
if (!getAccountID(wallet))
return Error{Status{RippledError::rpcINVALID_PARAMS, std::string(key) + "Malformed"}};
return MaybeError{};
}
return MaybeError{};
}};
static auto const kRPC_SPEC = RpcSpec{
{JS(account), validation::Required{}, validation::CustomValidators::accountValidator},
{JS(ledger_hash), validation::CustomValidators::uint256HexStringValidator},
{JS(ledger_index), validation::CustomValidators::ledgerIndexValidator},
{JS(hotwallet), kHOT_WALLET_VALIDATOR}
};
};
return kRPC_SPEC;
static auto const kSPEC_COMMON = RpcSpec{
{JS(account), validation::Required{}, validation::CustomValidators::accountValidator},
{JS(ledger_hash), validation::CustomValidators::uint256HexStringValidator},
{JS(ledger_index), validation::CustomValidators::ledgerIndexValidator}
};
auto static const kSPEC_V1 =
RpcSpec{kSPEC_COMMON, {{JS(hotwallet), getHotWalletValidator(ripple::rpcINVALID_HOTWALLET)}}};
auto static const kSPEC_V2 =
RpcSpec{kSPEC_COMMON, {{JS(hotwallet), getHotWalletValidator(ripple::rpcINVALID_PARAMS)}}};
return apiVersion == 1 ? kSPEC_V1 : kSPEC_V2;
}
/**

View File

@@ -179,6 +179,12 @@ LedgerEntryHandler::process(LedgerEntryHandler::Input input, Context const& ctx)
ripple::uint192{std::string_view(boost::json::value_to<std::string>(input.mptoken->at(JS(mpt_issuance_id))))
};
key = ripple::keylet::mptoken(mptIssuanceID, *holder).key;
} else if (input.permissionedDomain) {
auto const account = ripple::parseBase58<ripple::AccountID>(
boost::json::value_to<std::string>(input.permissionedDomain->at(JS(account)))
);
auto const seq = input.permissionedDomain->at(JS(seq)).as_int64();
key = ripple::keylet::permissionedDomain(*account, seq).key;
} else {
// Must specify 1 of the following fields to indicate what type
if (ctx.apiVersion == 1)
@@ -313,6 +319,7 @@ tag_invoke(boost::json::value_to_tag<LedgerEntryHandler::Input>, boost::json::va
{JS(oracle), ripple::ltORACLE},
{JS(credential), ripple::ltCREDENTIAL},
{JS(mptoken), ripple::ltMPTOKEN},
{JS(permissioned_domain), ripple::ltPERMISSIONED_DOMAIN}
};
auto const parseBridgeFromJson = [](boost::json::value const& bridgeJson) {
@@ -399,6 +406,8 @@ tag_invoke(boost::json::value_to_tag<LedgerEntryHandler::Input>, boost::json::va
input.credential = parseCredentialFromJson(jv.at(JS(credential)));
} else if (jsonObject.contains(JS(mptoken))) {
input.mptoken = jv.at(JS(mptoken)).as_object();
} else if (jsonObject.contains(JS(permissioned_domain))) {
input.permissionedDomain = jv.at(JS(permissioned_domain)).as_object();
}
if (jsonObject.contains("include_deleted"))

View File

@@ -103,6 +103,7 @@ public:
std::optional<boost::json::object> ticket;
std::optional<boost::json::object> amm;
std::optional<boost::json::object> mptoken;
std::optional<boost::json::object> permissionedDomain;
std::optional<ripple::STXChainBridge> bridge;
std::optional<std::string> bridgeAccount;
std::optional<uint32_t> chainClaimId;
@@ -374,6 +375,23 @@ public:
},
},
}},
{JS(permissioned_domain),
meta::WithCustomError{
validation::Type<std::string, boost::json::object>{}, Status(ClioError::RpcMalformedRequest)
},
meta::IfType<std::string>{kMALFORMED_REQUEST_HEX_STRING_VALIDATOR},
meta::IfType<boost::json::object>{meta::Section{
{JS(seq),
meta::WithCustomError{validation::Required{}, Status(ClioError::RpcMalformedRequest)},
meta::WithCustomError{validation::Type<uint32_t>{}, Status(ClioError::RpcMalformedRequest)}},
{
JS(account),
meta::WithCustomError{validation::Required{}, Status(ClioError::RpcMalformedRequest)},
meta::WithCustomError{
validation::CustomValidators::accountBase58Validator, Status(ClioError::RpcMalformedAddress)
},
},
}}},
{JS(ledger), check::Deprecated{}},
{"include_deleted", validation::Type<bool>{}},
};

View File

@@ -22,6 +22,7 @@ target_sources(
requests/impl/SslContext.cpp
ResponseExpirationCache.cpp
SignalsHandler.cpp
StopHelper.cpp
Taggable.cpp
TerminationHandler.cpp
TimeUtils.cpp

View File

@@ -117,6 +117,7 @@ class LedgerTypes {
LedgerTypeAttribute::chainLedgerType(JS(nunl), ripple::ltNEGATIVE_UNL),
LedgerTypeAttribute::deletionBlockerLedgerType(JS(mpt_issuance), ripple::ltMPTOKEN_ISSUANCE),
LedgerTypeAttribute::deletionBlockerLedgerType(JS(mptoken), ripple::ltMPTOKEN),
LedgerTypeAttribute::deletionBlockerLedgerType(JS(permissioned_domain), ripple::ltPERMISSIONED_DOMAIN),
};
public:

View File

@@ -34,7 +34,7 @@ class Mutex;
* @tparam LockType type of lock
* @tparam MutexType type of mutex
*/
template <typename ProtectedDataType, template <typename> typename LockType, typename MutexType>
template <typename ProtectedDataType, template <typename...> typename LockType, typename MutexType>
class Lock {
LockType<MutexType> lock_;
ProtectedDataType& data_;
@@ -129,7 +129,7 @@ public:
* @tparam LockType The type of lock to use
* @return A lock on the mutex and a reference to the protected data
*/
template <template <typename> typename LockType = std::lock_guard>
template <template <typename...> typename LockType = std::lock_guard>
Lock<ProtectedDataType const, LockType, MutexType>
lock() const
{
@@ -142,7 +142,7 @@ public:
* @tparam LockType The type of lock to use
* @return A lock on the mutex and a reference to the protected data
*/
template <template <typename> typename LockType = std::lock_guard>
template <template <typename...> typename LockType = std::lock_guard>
Lock<ProtectedDataType, LockType, MutexType>
lock()
{

46
src/util/StopHelper.cpp Normal file
View File

@@ -0,0 +1,46 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "util/StopHelper.hpp"
#include <boost/asio/spawn.hpp>
#include <boost/asio/steady_timer.hpp>
#include <chrono>
namespace util {
void
StopHelper::readyToStop()
{
onStopReady_();
*stopped_ = true;
}
void
StopHelper::asyncWaitForStop(boost::asio::yield_context yield)
{
boost::asio::steady_timer timer{yield.get_executor(), std::chrono::steady_clock::duration::max()};
onStopReady_.connect([&timer]() { timer.cancel(); });
boost::system::error_code error;
if (!*stopped_)
timer.async_wait(yield[error]);
}
} // namespace util

54
src/util/StopHelper.hpp Normal file
View File

@@ -0,0 +1,54 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <boost/asio/spawn.hpp>
#include <boost/signals2/signal.hpp>
#include <boost/signals2/variadic_signal.hpp>
#include <atomic>
#include <memory>
namespace util {
/**
* @brief Helper class to stop a class asynchronously.
*/
class StopHelper {
boost::signals2::signal<void()> onStopReady_;
std::unique_ptr<std::atomic_bool> stopped_ = std::make_unique<std::atomic_bool>(false);
public:
/**
* @brief Notify that the class is ready to stop.
*/
void
readyToStop();
/**
* @brief Wait for the class to stop.
*
* @param yield The coroutine context
*/
void
asyncWaitForStop(boost::asio::yield_context yield);
};
} // namespace util

View File

@@ -0,0 +1,116 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "util/async/AnyStrand.hpp"
#include <cstddef>
#include <functional>
#include <optional>
#include <queue>
#include <type_traits>
#include <utility>
#include <vector>
namespace util {
/**
* @brief A wrapper for std::priority_queue that serialises operations using a strand
* @note This may be a candidate for future improvements if performance proves to be poor (e.g. use a lock free queue)
*/
template <typename T, typename Compare = std::less<T>>
class StrandedPriorityQueue {
util::async::AnyStrand strand_;
std::size_t limit_;
std::priority_queue<T, std::vector<T>, Compare> queue_;
public:
/**
* @brief Construct a new priority queue on a strand
* @param strand The strand to use
* @param limit The limit of items allowed simultaniously in the queue
*/
StrandedPriorityQueue(util::async::AnyStrand&& strand, std::optional<std::size_t> limit = std::nullopt)
: strand_(std::move(strand)), limit_(limit.value_or(0uz))
{
}
/**
* @brief Enqueue a new item onto the queue if space is available
* @note This function blocks until the item is attempted to be added to the queue
*
* @tparam I Type of the item to add
* @param item The item to add
* @return true if item added to the queue; false otherwise
*/
template <typename I>
[[nodiscard]] bool
enqueue(I&& item)
requires std::is_same_v<std::decay_t<I>, T>
{
return strand_
.execute([&item, this] {
if (limit_ == 0uz or queue_.size() < limit_) {
queue_.push(std::forward<I>(item));
return true;
}
return false;
})
.get()
.value_or(false); // if some exception happens - failed to add
}
/**
* @brief Dequeue the next available item out of the queue
* @note This function blocks until the item is taken off the queue
* @return An item if available; nullopt otherwise
*/
[[nodiscard]] std::optional<T>
dequeue()
{
return strand_
.execute([this] -> std::optional<T> {
std::optional<T> out;
if (not queue_.empty()) {
out.emplace(queue_.top());
queue_.pop();
}
return out;
})
.get()
.value_or(std::nullopt);
}
/**
* @brief Check if the queue is empty
* @note This function blocks until the queue is checked
*
* @return true if the queue is empty; false otherwise
*/
[[nodiscard]] bool
empty()
{
return strand_.execute([this] { return queue_.empty(); }).get().value();
}
};
} // namespace util

View File

@@ -19,6 +19,7 @@
#pragma once
#include "util/async/AnyOperation.hpp"
#include "util/async/AnyStopToken.hpp"
#include "util/async/Concepts.hpp"
#include "util/async/impl/ErasedOperation.hpp"
@@ -61,7 +62,7 @@ public:
* @return The type-erased operation
*/
[[nodiscard]] auto
execute(SomeHandlerWithoutStopToken auto&& fn) const
execute(SomeHandlerWithoutStopToken auto&& fn)
{
using RetType = std::decay_t<decltype(fn())>;
static_assert(not std::is_same_v<RetType, std::any>);
@@ -85,7 +86,7 @@ public:
* @return The type-erased operation
*/
[[nodiscard]] auto
execute(SomeHandlerWith<AnyStopToken> auto&& fn) const
execute(SomeHandlerWith<AnyStopToken> auto&& fn)
{
using RetType = std::decay_t<decltype(fn(std::declval<AnyStopToken>()))>;
static_assert(not std::is_same_v<RetType, std::any>);
@@ -110,7 +111,7 @@ public:
* @return The type-erased operation
*/
[[nodiscard]] auto
execute(SomeHandlerWith<AnyStopToken> auto&& fn, SomeStdDuration auto timeout) const
execute(SomeHandlerWith<AnyStopToken> auto&& fn, SomeStdDuration auto timeout)
{
using RetType = std::decay_t<decltype(fn(std::declval<AnyStopToken>()))>;
static_assert(not std::is_same_v<RetType, std::any>);
@@ -137,7 +138,7 @@ private:
[[nodiscard]] virtual impl::ErasedOperation
execute(std::function<std::any(AnyStopToken)>, std::optional<std::chrono::milliseconds> timeout = std::nullopt)
const = 0;
[[nodiscard]] virtual impl::ErasedOperation execute(std::function<std::any()>) const = 0;
[[nodiscard]] virtual impl::ErasedOperation execute(std::function<std::any()>) = 0;
};
template <typename StrandType>
@@ -158,7 +159,7 @@ private:
}
[[nodiscard]] impl::ErasedOperation
execute(std::function<std::any()> fn) const override
execute(std::function<std::any()> fn) override
{
return strand.execute(std::move(fn));
}

View File

@@ -362,6 +362,10 @@ static constinit NumberValueConstraint<uint16_t> gValidateUint16{
std::numeric_limits<uint16_t>::min(),
std::numeric_limits<uint16_t>::max()
};
// log file size minimum is 1mb, log rotation time minimum is 1hr
static constinit NumberValueConstraint<uint32_t> gValidateLogSize{1, std::numeric_limits<uint32_t>::max()};
static constinit NumberValueConstraint<uint32_t> gValidateLogRotationTime{1, std::numeric_limits<uint32_t>::max()};
static constinit NumberValueConstraint<uint32_t> gValidateUint32{
std::numeric_limits<uint32_t>::min(),
std::numeric_limits<uint32_t>::max()

View File

@@ -350,8 +350,9 @@ static ClioConfigDefinition gClioConfig = ClioConfigDefinition{
{"server.admin_password", ConfigValue{ConfigType::String}.optional()},
{"server.processing_policy",
ConfigValue{ConfigType::String}.defaultValue("parallel").withConstraint(gValidateProcessingPolicy)},
{"server.parallel_requests_limit", ConfigValue{ConfigType::Integer}.optional()},
{"server.ws_max_sending_queue_size", ConfigValue{ConfigType::Integer}.defaultValue(1500)},
{"server.parallel_requests_limit", ConfigValue{ConfigType::Integer}.optional().withConstraint(gValidateUint16)},
{"server.ws_max_sending_queue_size",
ConfigValue{ConfigType::Integer}.defaultValue(1500).withConstraint(gValidateUint32)},
{"server.__ng_web_server", ConfigValue{ConfigType::Boolean}.defaultValue(false)},
{"prometheus.enabled", ConfigValue{ConfigType::Boolean}.defaultValue(true)},
@@ -387,12 +388,13 @@ static ClioConfigDefinition gClioConfig = ClioConfigDefinition{
{"log_directory", ConfigValue{ConfigType::String}.optional()},
{"log_rotation_size", ConfigValue{ConfigType::Integer}.defaultValue(2048).withConstraint(gValidateUint32)},
{"log_rotation_size", ConfigValue{ConfigType::Integer}.defaultValue(2048).withConstraint(gValidateLogSize)},
{"log_directory_max_size", ConfigValue{ConfigType::Integer}.defaultValue(50 * 1024).withConstraint(gValidateUint32)
},
{"log_directory_max_size",
ConfigValue{ConfigType::Integer}.defaultValue(50 * 1024).withConstraint(gValidateLogSize)},
{"log_rotation_hour_interval", ConfigValue{ConfigType::Integer}.defaultValue(12).withConstraint(gValidateUint32)},
{"log_rotation_hour_interval",
ConfigValue{ConfigType::Integer}.defaultValue(12).withConstraint(gValidateLogRotationTime)},
{"log_tag_style", ConfigValue{ConfigType::String}.defaultValue("none").withConstraint(gValidateLogTag)},

View File

@@ -23,7 +23,6 @@
#include "util/newconfig/Error.hpp"
#include "util/newconfig/Types.hpp"
#include <boost/filesystem/path.hpp>
#include <boost/json/array.hpp>
#include <boost/json/object.hpp>
#include <boost/json/parse.hpp>
@@ -34,10 +33,9 @@
#include <cstddef>
#include <cstdint>
#include <exception>
#include <filesystem>
#include <fstream>
#include <ios>
#include <iostream>
#include <ostream>
#include <sstream>
#include <string>
#include <string_view>
@@ -82,7 +80,7 @@ ConfigFileJson::ConfigFileJson(boost::json::object jsonObj)
}
std::expected<ConfigFileJson, Error>
ConfigFileJson::makeConfigFileJson(boost::filesystem::path configFilePath)
ConfigFileJson::makeConfigFileJson(std::filesystem::path const& configFilePath)
{
try {
if (auto const in = std::ifstream(configFilePath.string(), std::ios::in | std::ios::binary); in) {

View File

@@ -23,10 +23,10 @@
#include "util/newconfig/Error.hpp"
#include "util/newconfig/Types.hpp"
#include <boost/filesystem/path.hpp>
#include <boost/json/object.hpp>
#include <expected>
#include <filesystem>
#include <string>
#include <string_view>
#include <vector>
@@ -79,7 +79,7 @@ public:
* @return A ConfigFileJson object if parsing user file is successful. Error otherwise
*/
[[nodiscard]] static std::expected<ConfigFileJson, Error>
makeConfigFileJson(boost::filesystem::path configFilePath);
makeConfigFileJson(std::filesystem::path const& configFilePath);
private:
/**

View File

@@ -21,6 +21,7 @@
#include "util/Assert.hpp"
#include "util/Concepts.hpp"
#include "util/Mutex.hpp"
#include "util/prometheus/OStream.hpp"
#include <cstdint>
@@ -61,28 +62,30 @@ public:
void
setBuckets(std::vector<ValueType> const& bounds)
{
std::scoped_lock const lock{*mutex_};
ASSERT(buckets_.empty(), "Buckets can be set only once.");
buckets_.reserve(bounds.size());
auto data = data_->template lock<std::scoped_lock>();
ASSERT(data->buckets.empty(), "Buckets can be set only once.");
data->buckets.reserve(bounds.size());
for (auto const& bound : bounds) {
buckets_.emplace_back(bound);
data->buckets.emplace_back(bound);
}
}
void
observe(ValueType const value)
{
auto const bucket =
std::lower_bound(buckets_.begin(), buckets_.end(), value, [](Bucket const& bucket, ValueType const& value) {
return bucket.upperBound < value;
});
std::scoped_lock const lock{*mutex_};
if (bucket != buckets_.end()) {
auto data = data_->template lock<std::scoped_lock>();
auto const bucket = std::lower_bound(
data->buckets.begin(),
data->buckets.end(),
value,
[](Bucket const& bucket, ValueType const& value) { return bucket.upperBound < value; }
);
if (bucket != data->buckets.end()) {
++bucket->count;
} else {
++lastBucket_.count;
++data->lastBucket.count;
}
sum_ += value;
data->sum += value;
}
void
@@ -98,15 +101,15 @@ public:
labelsString.back() = ',';
}
std::scoped_lock const lock{*mutex_};
auto data = data_->template lock<std::scoped_lock>();
std::uint64_t cumulativeCount = 0;
for (auto const& bucket : buckets_) {
for (auto const& bucket : data->buckets) {
cumulativeCount += bucket.count;
stream << name << "_bucket" << labelsString << "le=\"" << bucket.upperBound << "\"} " << cumulativeCount
<< '\n';
}
cumulativeCount += lastBucket_.count;
cumulativeCount += data->lastBucket.count;
stream << name << "_bucket" << labelsString << "le=\"+Inf\"} " << cumulativeCount << '\n';
if (labelsString.size() == 1) {
@@ -114,7 +117,7 @@ public:
} else {
labelsString.back() = '}';
}
stream << name << "_sum" << labelsString << " " << sum_ << '\n';
stream << name << "_sum" << labelsString << " " << data->sum << '\n';
stream << name << "_count" << labelsString << " " << cumulativeCount << '\n';
}
@@ -128,10 +131,12 @@ private:
std::uint64_t count = 0;
};
std::vector<Bucket> buckets_;
Bucket lastBucket_{std::numeric_limits<ValueType>::max()};
ValueType sum_ = 0;
mutable std::unique_ptr<std::mutex> mutex_ = std::make_unique<std::mutex>();
struct Data {
std::vector<Bucket> buckets;
Bucket lastBucket{std::numeric_limits<ValueType>::max()};
ValueType sum = 0;
};
std::unique_ptr<util::Mutex<Data>> data_ = std::make_unique<util::Mutex<Data>>();
};
} // namespace util::prometheus::impl

View File

@@ -58,17 +58,17 @@ DOSGuard::isOk(std::string const& ip) const noexcept
return true;
{
std::scoped_lock const lck(mtx_);
if (ipState_.find(ip) != ipState_.end()) {
auto [transferedByte, requests] = ipState_.at(ip);
if (transferedByte > maxFetches_ || requests > maxRequestCount_) {
auto lock = mtx_.lock<std::scoped_lock>();
if (lock->ipState.find(ip) != lock->ipState.end()) {
auto [transferredByte, requests] = lock->ipState.at(ip);
if (transferredByte > maxFetches_ || requests > maxRequestCount_) {
LOG(log_.warn()) << "Dosguard: Client surpassed the rate limit. ip = " << ip
<< " Transfered Byte: " << transferedByte << "; Requests: " << requests;
<< " Transfered Byte: " << transferredByte << "; Requests: " << requests;
return false;
}
}
auto it = ipConnCount_.find(ip);
if (it != ipConnCount_.end()) {
auto it = lock->ipConnCount.find(ip);
if (it != lock->ipConnCount.end()) {
if (it->second > maxConnCount_) {
LOG(log_.warn()) << "Dosguard: Client surpassed the rate limit. ip = " << ip
<< " Concurrent connection: " << it->second;
@@ -84,8 +84,8 @@ DOSGuard::increment(std::string const& ip) noexcept
{
if (whitelistHandler_.get().isWhiteListed(ip))
return;
std::scoped_lock const lck{mtx_};
ipConnCount_[ip]++;
auto lock = mtx_.lock<std::scoped_lock>();
lock->ipConnCount[ip]++;
}
void
@@ -93,11 +93,11 @@ DOSGuard::decrement(std::string const& ip) noexcept
{
if (whitelistHandler_.get().isWhiteListed(ip))
return;
std::scoped_lock const lck{mtx_};
ASSERT(ipConnCount_[ip] > 0, "Connection count for ip {} can't be 0", ip);
ipConnCount_[ip]--;
if (ipConnCount_[ip] == 0)
ipConnCount_.erase(ip);
auto lock = mtx_.lock<std::scoped_lock>();
ASSERT(lock->ipConnCount[ip] > 0, "Connection count for ip {} can't be 0", ip);
lock->ipConnCount[ip]--;
if (lock->ipConnCount[ip] == 0)
lock->ipConnCount.erase(ip);
}
[[maybe_unused]] bool
@@ -107,8 +107,8 @@ DOSGuard::add(std::string const& ip, uint32_t numObjects) noexcept
return true;
{
std::scoped_lock const lck(mtx_);
ipState_[ip].transferedByte += numObjects;
auto lock = mtx_.lock<std::scoped_lock>();
lock->ipState[ip].transferedByte += numObjects;
}
return isOk(ip);
@@ -121,8 +121,8 @@ DOSGuard::request(std::string const& ip) noexcept
return true;
{
std::scoped_lock const lck(mtx_);
ipState_[ip].requestsCount++;
auto lock = mtx_.lock<std::scoped_lock>();
lock->ipState[ip].requestsCount++;
}
return isOk(ip);
@@ -131,8 +131,8 @@ DOSGuard::request(std::string const& ip) noexcept
void
DOSGuard::clear() noexcept
{
std::scoped_lock const lck(mtx_);
ipState_.clear();
auto lock = mtx_.lock<std::scoped_lock>();
lock->ipState.clear();
}
[[nodiscard]] std::unordered_set<std::string>

View File

@@ -19,6 +19,7 @@
#pragma once
#include "util/Mutex.hpp"
#include "util/log/Logger.hpp"
#include "util/newconfig/ConfigDefinition.hpp"
#include "web/dosguard/DOSGuardInterface.hpp"
@@ -30,7 +31,6 @@
#include <cstdint>
#include <functional>
#include <mutex>
#include <string>
#include <string_view>
#include <unordered_map>
@@ -52,9 +52,12 @@ class DOSGuard : public DOSGuardInterface {
std::uint32_t requestsCount = 0; /**< Accumulated served requests count */
};
mutable std::mutex mtx_;
std::unordered_map<std::string, ClientState> ipState_;
std::unordered_map<std::string, std::uint32_t> ipConnCount_;
struct State {
std::unordered_map<std::string, ClientState> ipState;
std::unordered_map<std::string, std::uint32_t> ipConnCount;
};
util::Mutex<State> mtx_;
std::reference_wrapper<WhitelistHandlerInterface const> whitelistHandler_;
std::uint32_t const maxFetches_;

View File

@@ -88,7 +88,6 @@ public:
case rpc::ClioError::RpcMalformedRequest:
case rpc::ClioError::RpcMalformedOwner:
case rpc::ClioError::RpcMalformedAddress:
case rpc::ClioError::RpcInvalidHotWallet:
case rpc::ClioError::RpcFieldNotFoundTransaction:
case rpc::ClioError::RpcMalformedOracleDocumentId:
case rpc::ClioError::RpcMalformedAuthorizedCredentials:

Some files were not shown because too many files have changed in this diff Show More