Compare commits

...

15 Commits

Author SHA1 Message Date
Sergey Kuznetsov
d0f88438ca fix: Check config is valid 2025-01-14 16:13:11 +00:00
Alex Kremer
2cf849dd12 feat: ETLng scheduling (#1820)
For #1596
2025-01-14 15:50:59 +00:00
Peter Chen
c47b96bc68 fix: comment from verify config PR (#1823)
PR here: https://github.com/XRPLF/clio/pull/1814#
2025-01-13 12:58:20 -05:00
Sergey Kuznetsov
9659d98140 fix: Reading log_channels levels from config (#1821) 2025-01-13 16:29:45 +00:00
Peter Chen
f1698c55ff feat: add config verify flag (#1814)
fixes #1806
2025-01-13 09:57:11 -05:00
Alex Kremer
91c00e781a fix: Silence expected use after move warnings (#1819)
Fixes #1818
2025-01-10 15:47:48 +00:00
github-actions[bot]
c0d52723c9 style: clang-tidy auto fixes (#1817)
Fixes #1816. Please review and commit clang-tidy fixes.

Co-authored-by: kuznetsss <15742918+kuznetsss@users.noreply.github.com>
2025-01-10 09:06:05 +00:00
Alex Kremer
590c07ad84 fix: AsyncFramework RAII (#1815)
Fixes #1812
2025-01-09 15:26:25 +00:00
Alex Kremer
48c8d85d0c feat: ETLng loader basics (#1808)
For #1597
2025-01-09 14:47:08 +00:00
Alex Kremer
36a9f40a60 fix: Optimize ledger_range query (#1797) 2025-01-07 14:52:56 +00:00
Peter Chen
698718a02a fix: Incorrect log values in newconfig (#1807)
Additional fixes for #1627
2025-01-06 17:37:40 +00:00
Sergey Kuznetsov
0a9dbe1cc1 ci: Switch CI to macos 15 runners (#1761) 2025-01-06 13:19:03 +00:00
Peter Chen
cce7aa2893 style: Fix formatting via clang-format (#1809)
For #1760
2025-01-04 04:00:03 +00:00
Alex Kremer
820b32c6d7 chore: No ALL_CAPS (#1760)
Fixes #1680
2025-01-02 11:39:31 +00:00
github-actions[bot]
efe5d08205 style: clang-tidy auto fixes (#1803)
Fixes #1802. Please review and commit clang-tidy fixes.

Co-authored-by: kuznetsss <15742918+kuznetsss@users.noreply.github.com>
2024-12-23 09:02:34 +00:00
400 changed files with 12019 additions and 9575 deletions

View File

@@ -130,6 +130,7 @@ Checks: '-*,
readability-enum-initial-value,
readability-implicit-bool-conversion,
readability-inconsistent-declaration-parameter-name,
readability-identifier-naming,
readability-make-member-function-const,
readability-math-missing-parentheses,
readability-misleading-indentation,
@@ -149,6 +150,39 @@ Checks: '-*,
CheckOptions:
readability-braces-around-statements.ShortStatementLines: 2
readability-identifier-naming.MacroDefinitionCase: UPPER_CASE
readability-identifier-naming.ClassCase: CamelCase
readability-identifier-naming.StructCase: CamelCase
readability-identifier-naming.UnionCase: CamelCase
readability-identifier-naming.EnumCase: CamelCase
readability-identifier-naming.EnumConstantCase: CamelCase
readability-identifier-naming.ScopedEnumConstantCase: CamelCase
readability-identifier-naming.GlobalConstantCase: UPPER_CASE
readability-identifier-naming.GlobalConstantPrefix: 'k'
readability-identifier-naming.GlobalVariableCase: CamelCase
readability-identifier-naming.GlobalVariablePrefix: 'g'
readability-identifier-naming.ConstexprFunctionCase: camelBack
readability-identifier-naming.ConstexprMethodCase: camelBack
readability-identifier-naming.ClassMethodCase: camelBack
readability-identifier-naming.ClassMemberCase: camelBack
readability-identifier-naming.ClassConstantCase: UPPER_CASE
readability-identifier-naming.ClassConstantPrefix: 'k'
readability-identifier-naming.StaticConstantCase: UPPER_CASE
readability-identifier-naming.StaticConstantPrefix: 'k'
readability-identifier-naming.StaticVariableCase: UPPER_CASE
readability-identifier-naming.StaticVariablePrefix: 'k'
readability-identifier-naming.ConstexprVariableCase: UPPER_CASE
readability-identifier-naming.ConstexprVariablePrefix: 'k'
readability-identifier-naming.LocalConstantCase: camelBack
readability-identifier-naming.LocalVariableCase: camelBack
readability-identifier-naming.TemplateParameterCase: CamelCase
readability-identifier-naming.ParameterCase: camelBack
readability-identifier-naming.FunctionCase: camelBack
readability-identifier-naming.MemberCase: camelBack
readability-identifier-naming.PrivateMemberSuffix: _
readability-identifier-naming.ProtectedMemberSuffix: _
readability-identifier-naming.PublicMemberSuffix: ''
readability-identifier-naming.FunctionIgnoredRegexp: '.*tag_invoke.*'
bugprone-unsafe-functions.ReportMoreUnsafeFunctions: true
bugprone-unused-return-value.CheckedReturnTypes: ::std::error_code;::std::error_condition;::std::errc
misc-include-cleaner.IgnoreHeaders: '.*/(detail|impl)/.*;.*(expected|unexpected).*;.*ranges_lower_bound\.h;time.h;stdlib.h'

View File

@@ -11,7 +11,7 @@ runs:
if: ${{ runner.os == 'macOS' }}
shell: bash
run: |
brew install llvm@14 pkg-config ninja bison cmake ccache jq gh conan@1
brew install llvm@14 pkg-config ninja bison cmake ccache jq gh conan@1 ca-certificates
echo "/opt/homebrew/opt/conan@1/bin" >> $GITHUB_PATH
- name: Fix git permissions on Linux

View File

@@ -15,10 +15,10 @@ runs:
if: ${{ runner.os == 'macOS' }}
shell: bash
env:
CONAN_PROFILE: apple_clang_15
CONAN_PROFILE: apple_clang_16
id: conan_setup_mac
run: |
echo "Creating $CONAN_PROFILE conan profile";
echo "Creating $CONAN_PROFILE conan profile"
conan profile new $CONAN_PROFILE --detect --force
conan profile update settings.compiler.libcxx=libc++ $CONAN_PROFILE
conan profile update settings.compiler.cppstd=20 $CONAN_PROFILE

View File

@@ -74,7 +74,7 @@ jobs:
conan_profile: clang
code_coverage: false
static: true
- os: macos14
- os: macos15
build_type: Release
code_coverage: false
static: false
@@ -197,8 +197,8 @@ jobs:
image: rippleci/clio_ci:latest
conan_profile: clang
build_type: Debug
- os: macos14
conan_profile: apple_clang_15
- os: macos15
conan_profile: apple_clang_16
build_type: Release
runs-on: [self-hosted, "${{ matrix.os }}"]
container: ${{ matrix.container }}

View File

@@ -15,7 +15,7 @@ jobs:
fail-fast: false
matrix:
include:
- os: macos14
- os: macos15
build_type: Release
static: false
- os: heavy
@@ -90,7 +90,7 @@ jobs:
fail-fast: false
matrix:
include:
- os: macos14
- os: macos15
build_type: Release
integration_tests: false
- os: heavy

View File

@@ -188,10 +188,10 @@ public:
static auto
generateData()
{
constexpr auto TOTAL = 10'000;
constexpr auto kTOTAL = 10'000;
std::vector<uint64_t> data;
data.reserve(TOTAL);
for (auto i = 0; i < TOTAL; ++i)
data.reserve(kTOTAL);
for (auto i = 0; i < kTOTAL; ++i)
data.push_back(util::Random::uniform(1, 100'000'000));
return data;

View File

@@ -23,19 +23,19 @@
namespace util::build {
static constexpr char versionString[] = "@CLIO_VERSION@";
static constexpr char versionString[] = "@CLIO_VERSION@"; // NOLINT(readability-identifier-naming)
std::string const&
getClioVersionString()
{
static std::string const value = versionString;
static std::string const value = versionString; // NOLINT(readability-identifier-naming)
return value;
}
std::string const&
getClioFullVersionString()
{
static std::string const value = "clio-" + getClioVersionString();
static std::string const value = "clio-" + getClioVersionString(); // NOLINT(readability-identifier-naming)
return value;
}

View File

@@ -44,9 +44,10 @@ CliArgs::parse(int argc, char const* argv[])
description.add_options()
("help,h", "print help message and exit")
("version,v", "print version and exit")
("conf,c", po::value<std::string>()->default_value(defaultConfigPath), "configuration file")
("conf,c", po::value<std::string>()->default_value(kDEFAULT_CONFIG_PATH), "configuration file")
("ng-web-server,w", "Use ng-web-server")
("migrate", po::value<std::string>(), "start migration helper")
("verify", "Checks the validity of config values")
;
// clang-format on
po::positional_options_description positional;
@@ -75,6 +76,9 @@ CliArgs::parse(int argc, char const* argv[])
return Action{Action::Migrate{.configPath = std::move(configPath), .subCmd = MigrateSubCmd::migration(opt)}};
}
if (parsed.count("verify") != 0u)
return Action{Action::VerifyConfig{.configPath = std::move(configPath)}};
return Action{Action::Run{.configPath = std::move(configPath), .useNgWebServer = parsed.count("ng-web-server") != 0}
};
}

View File

@@ -35,7 +35,7 @@ public:
/**
* @brief Default configuration path.
*/
static constexpr char defaultConfigPath[] = "/etc/opt/clio/config.json";
static constexpr char kDEFAULT_CONFIG_PATH[] = "/etc/opt/clio/config.json";
/**
* @brief An action parsed from the command line.
@@ -59,6 +59,11 @@ public:
MigrateSubCmd subCmd;
};
/** @brief Verify Config action. */
struct VerifyConfig {
std::string configPath;
};
/**
* @brief Construct an action from a Run.
*
@@ -66,7 +71,7 @@ public:
*/
template <typename ActionType>
requires std::is_same_v<ActionType, Run> or std::is_same_v<ActionType, Exit> or
std::is_same_v<ActionType, Migrate>
std::is_same_v<ActionType, Migrate> or std::is_same_v<ActionType, VerifyConfig>
explicit Action(ActionType&& action) : action_(std::forward<ActionType>(action))
{
}
@@ -86,7 +91,7 @@ public:
}
private:
std::variant<Run, Exit, Migrate> action_;
std::variant<Run, Exit, Migrate, VerifyConfig> action_;
};
/**

View File

@@ -101,25 +101,25 @@ ClioApplication::run(bool const useNgWebServer)
auto sweepHandler = web::dosguard::IntervalSweepHandler{config_, ioc, dosGuard};
// Interface to the database
auto backend = data::make_Backend(config_);
auto backend = data::makeBackend(config_);
// Manages clients subscribed to streams
auto subscriptions = feed::SubscriptionManager::make_SubscriptionManager(config_, backend);
auto subscriptions = feed::SubscriptionManager::makeSubscriptionManager(config_, backend);
// Tracks which ledgers have been validated by the network
auto ledgers = etl::NetworkValidatedLedgers::make_ValidatedLedgers();
auto ledgers = etl::NetworkValidatedLedgers::makeValidatedLedgers();
// Handles the connection to one or more rippled nodes.
// ETL uses the balancer to extract data.
// The server uses the balancer to forward RPCs to a rippled node.
// The balancer itself publishes to streams (transactions_proposed and accounts_proposed)
auto balancer = etl::LoadBalancer::make_LoadBalancer(config_, ioc, backend, subscriptions, ledgers);
auto balancer = etl::LoadBalancer::makeLoadBalancer(config_, ioc, backend, subscriptions, ledgers);
// ETL is responsible for writing and publishing to streams. In read-only mode, ETL only publishes
auto etl = etl::ETLService::make_ETLService(config_, ioc, backend, subscriptions, balancer, ledgers);
auto etl = etl::ETLService::makeETLService(config_, ioc, backend, subscriptions, balancer, ledgers);
auto workQueue = rpc::WorkQueue::make_WorkQueue(config_);
auto counters = rpc::Counters::make_Counters(workQueue);
auto workQueue = rpc::WorkQueue::makeWorkQueue(config_);
auto counters = rpc::Counters::makeCounters(workQueue);
auto const amendmentCenter = std::make_shared<data::AmendmentCenter const>(backend);
auto const handlerProvider = std::make_shared<rpc::impl::ProductionHandlerProvider const>(
config_, backend, subscriptions, balancer, etl, amendmentCenter, counters
@@ -127,19 +127,19 @@ ClioApplication::run(bool const useNgWebServer)
using RPCEngineType = rpc::RPCEngine<etl::LoadBalancer, rpc::Counters>;
auto const rpcEngine =
RPCEngineType::make_RPCEngine(config_, backend, balancer, dosGuard, workQueue, counters, handlerProvider);
RPCEngineType::makeRPCEngine(config_, backend, balancer, dosGuard, workQueue, counters, handlerProvider);
if (useNgWebServer or config_.get<bool>("server.__ng_web_server")) {
web::ng::RPCServerHandler<RPCEngineType, etl::ETLService> handler{config_, backend, rpcEngine, etl};
auto expectedAdminVerifier = web::make_AdminVerificationStrategy(config_);
auto expectedAdminVerifier = web::makeAdminVerificationStrategy(config_);
if (not expectedAdminVerifier.has_value()) {
LOG(util::LogService::error()) << "Error creating admin verifier: " << expectedAdminVerifier.error();
return EXIT_FAILURE;
}
auto const adminVerifier = std::move(expectedAdminVerifier).value();
auto httpServer = web::ng::make_Server(config_, OnConnectCheck{dosGuard}, DisconnectHook{dosGuard}, ioc);
auto httpServer = web::ng::makeServer(config_, OnConnectCheck{dosGuard}, DisconnectHook{dosGuard}, ioc);
if (not httpServer.has_value()) {
LOG(util::LogService::error()) << "Error creating web server: " << httpServer.error();
@@ -170,7 +170,7 @@ ClioApplication::run(bool const useNgWebServer)
auto handler =
std::make_shared<web::RPCServerHandler<RPCEngineType, etl::ETLService>>(config_, backend, rpcEngine, etl);
auto const httpServer = web::make_HttpServer(config_, ioc, dosGuard, handler);
auto const httpServer = web::makeHttpServer(config_, ioc, dosGuard, handler);
// Blocks until stopped.
// When stopped, shared_ptrs fall out of scope

View File

@@ -18,6 +18,7 @@
//==============================================================================
#pragma once
#include "util/SignalsHandler.hpp"
#include "util/newconfig/ConfigDefinition.hpp"

57
src/app/VerifyConfig.hpp Normal file
View File

@@ -0,0 +1,57 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "util/newconfig/ConfigDefinition.hpp"
#include "util/newconfig/ConfigFileJson.hpp"
#include <cstdlib>
#include <iostream>
#include <string_view>
namespace app {
/**
* @brief Verifies user's config values are correct
*
* @param configPath The path to config
* @return true if config values are all correct, false otherwise
*/
inline bool
verifyConfig(std::string_view configPath)
{
using namespace util::config;
auto const json = ConfigFileJson::makeConfigFileJson(configPath);
if (!json.has_value()) {
std::cerr << "Error parsing json from config: " << configPath << "\n" << json.error().error << std::endl;
return false;
}
auto const errors = gClioConfig.parse(json.value());
if (errors.has_value()) {
for (auto const& err : errors.value()) {
std::cerr << "Issues found in provided config '" << configPath << "':\n";
std::cerr << err.error << std::endl;
}
return false;
}
return true;
}
} // namespace app

View File

@@ -97,7 +97,7 @@ HealthCheckHandler::operator()(
boost::asio::yield_context
)
{
static auto constexpr HealthCheckHTML = R"html(
static auto constexpr kHEALTH_CHECK_HTML = R"html(
<!DOCTYPE html>
<html>
<head><title>Test page for Clio</title></head>
@@ -105,7 +105,7 @@ HealthCheckHandler::operator()(
</html>
)html";
return web::ng::Response{boost::beast::http::status::ok, HealthCheckHTML, request};
return web::ng::Response{boost::beast::http::status::ok, kHEALTH_CHECK_HTML, request};
}
} // namespace app

View File

@@ -213,9 +213,9 @@ public:
auto jsonResponse = boost::json::parse(response.message()).as_object();
jsonResponse["warning"] = "load";
if (jsonResponse.contains("warnings") && jsonResponse["warnings"].is_array()) {
jsonResponse["warnings"].as_array().push_back(rpc::makeWarning(rpc::warnRPC_RATE_LIMIT));
jsonResponse["warnings"].as_array().push_back(rpc::makeWarning(rpc::WarnRpcRateLimit));
} else {
jsonResponse["warnings"] = boost::json::array{rpc::makeWarning(rpc::warnRPC_RATE_LIMIT)};
jsonResponse["warnings"] = boost::json::array{rpc::makeWarning(rpc::WarnRpcRateLimit)};
}
response.setMessage(jsonResponse);
}

View File

@@ -50,10 +50,10 @@
namespace {
std::unordered_set<std::string>&
SUPPORTED_AMENDMENTS()
supportedAmendments()
{
static std::unordered_set<std::string> amendments = {};
return amendments;
static std::unordered_set<std::string> kAMENDMENTS = {};
return kAMENDMENTS;
}
bool
@@ -72,8 +72,8 @@ namespace impl {
WritingAmendmentKey::WritingAmendmentKey(std::string amendmentName) : AmendmentKey{std::move(amendmentName)}
{
ASSERT(not SUPPORTED_AMENDMENTS().contains(name), "Attempt to register the same amendment twice");
SUPPORTED_AMENDMENTS().insert(name);
ASSERT(not supportedAmendments().contains(name), "Attempt to register the same amendment twice");
supportedAmendments().insert(name);
}
} // namespace impl
@@ -90,7 +90,7 @@ AmendmentKey::operator std::string_view() const
AmendmentKey::operator ripple::uint256() const
{
return Amendment::GetAmendmentId(name);
return Amendment::getAmendmentId(name);
}
AmendmentCenter::AmendmentCenter(std::shared_ptr<data::BackendInterface> const& backend) : backend_{backend}
@@ -103,9 +103,9 @@ AmendmentCenter::AmendmentCenter(std::shared_ptr<data::BackendInterface> const&
auto const& [name, support] = p;
return Amendment{
.name = name,
.feature = Amendment::GetAmendmentId(name),
.feature = Amendment::getAmendmentId(name),
.isSupportedByXRPL = support != ripple::AmendmentSupport::Unsupported,
.isSupportedByClio = rg::find(SUPPORTED_AMENDMENTS(), name) != rg::end(SUPPORTED_AMENDMENTS()),
.isSupportedByClio = rg::find(supportedAmendments(), name) != rg::end(supportedAmendments()),
.isRetired = support == ripple::AmendmentSupport::Retired
};
}),
@@ -180,7 +180,7 @@ AmendmentCenter::operator[](AmendmentKey const& key) const
}
ripple::uint256
Amendment::GetAmendmentId(std::string_view name)
Amendment::getAmendmentId(std::string_view name)
{
return ripple::sha512Half(ripple::Slice(name.data(), name.size()));
}

View File

@@ -67,6 +67,7 @@ struct Amendments {
// Most of the time it's going to be no changes at all.
/** @cond */
// NOLINTBEGIN(readability-identifier-naming)
REGISTER(OwnerPaysFee);
REGISTER(Flow);
REGISTER(FlowCross);
@@ -154,6 +155,7 @@ struct Amendments {
REGISTER(fix1512);
REGISTER(fix1523);
REGISTER(fix1528);
// NOLINTEND(readability-identifier-naming)
/** @endcond */
};

View File

@@ -36,7 +36,7 @@ namespace data {
namespace {
std::vector<std::int64_t> const histogramBuckets{1, 2, 5, 10, 20, 50, 100, 200, 500, 700, 1000};
std::vector<std::int64_t> const kHISTOGRAM_BUCKETS{1, 2, 5, 10, 20, 50, 100, 200, 500, 700, 1000};
std::int64_t
durationInMillisecondsSince(std::chrono::steady_clock::time_point const startTime)
@@ -69,13 +69,13 @@ BackendCounters::BackendCounters()
, readDurationHistogram_(PrometheusService::histogramInt(
"backend_duration_milliseconds_histogram",
Labels({Label{"operation", "read"}}),
histogramBuckets,
kHISTOGRAM_BUCKETS,
"The duration of backend read operations including retries"
))
, writeDurationHistogram_(PrometheusService::histogramInt(
"backend_duration_milliseconds_histogram",
Labels({Label{"operation", "write"}}),
histogramBuckets,
kHISTOGRAM_BUCKETS,
"The duration of backend write operations including retries"
))
{

View File

@@ -41,9 +41,9 @@ namespace data {
* @return A shared_ptr<BackendInterface> with the selected implementation
*/
inline std::shared_ptr<BackendInterface>
make_Backend(util::config::ClioConfigDefinition const& config)
makeBackend(util::config::ClioConfigDefinition const& config)
{
static util::Logger const log{"Backend"};
static util::Logger const log{"Backend"}; // NOLINT(readability-identifier-naming)
LOG(log.info()) << "Constructing BackendInterface";
auto const readOnly = config.get<bool>("read_only");

View File

@@ -267,7 +267,7 @@ std::optional<LedgerRange>
BackendInterface::fetchLedgerRange() const
{
std::shared_lock const lck(rngMtx_);
return range;
return range_;
}
void
@@ -276,16 +276,16 @@ BackendInterface::updateRange(uint32_t newMax)
std::scoped_lock const lck(rngMtx_);
ASSERT(
!range || newMax >= range->maxSequence,
!range_ || newMax >= range_->maxSequence,
"Range shouldn't exist yet or newMax should be greater. newMax = {}, range->maxSequence = {}",
newMax,
range->maxSequence
range_->maxSequence
);
if (!range) {
range = {.minSequence = newMax, .maxSequence = newMax};
if (!range_) {
range_ = {.minSequence = newMax, .maxSequence = newMax};
} else {
range->maxSequence = newMax;
range_->maxSequence = newMax;
}
}
@@ -296,10 +296,10 @@ BackendInterface::setRange(uint32_t min, uint32_t max, bool force)
if (!force) {
ASSERT(min <= max, "Range min must be less than or equal to max");
ASSERT(not range.has_value(), "Range was already set");
ASSERT(not range_.has_value(), "Range was already set");
}
range = {.minSequence = min, .maxSequence = max};
range_ = {.minSequence = min, .maxSequence = max};
}
LedgerPage
@@ -320,10 +320,10 @@ BackendInterface::fetchLedgerPage(
ripple::uint256 const& curCursor = [&]() {
if (!keys.empty())
return keys.back();
return (cursor ? *cursor : firstKey);
return (cursor ? *cursor : kFIRST_KEY);
}();
std::uint32_t const seq = outOfOrder ? range->maxSequence : ledgerSequence;
std::uint32_t const seq = outOfOrder ? range_->maxSequence : ledgerSequence;
auto succ = fetchSuccessorKey(curCursor, seq, yield);
if (!succ) {

View File

@@ -65,7 +65,7 @@ public:
}
};
static constexpr std::size_t DEFAULT_WAIT_BETWEEN_RETRY = 500;
static constexpr std::size_t kDEFAULT_WAIT_BETWEEN_RETRY = 500;
/**
* @brief A helper function that catches DatabaseTimout exceptions and retries indefinitely.
*
@@ -76,9 +76,9 @@ static constexpr std::size_t DEFAULT_WAIT_BETWEEN_RETRY = 500;
*/
template <typename FnType>
auto
retryOnTimeout(FnType func, size_t waitMs = DEFAULT_WAIT_BETWEEN_RETRY)
retryOnTimeout(FnType func, size_t waitMs = kDEFAULT_WAIT_BETWEEN_RETRY)
{
static util::Logger const log{"Backend"};
static util::Logger const log{"Backend"}; // NOLINT(readability-identifier-naming)
while (true) {
try {
@@ -138,7 +138,7 @@ synchronousAndRetryOnTimeout(FnType&& func)
class BackendInterface {
protected:
mutable std::shared_mutex rngMtx_;
std::optional<LedgerRange> range;
std::optional<LedgerRange> range_;
LedgerCache cache_;
std::optional<etl::CorruptionDetector<LedgerCache>> corruptionDetector_;

View File

@@ -22,6 +22,7 @@
#include "data/BackendInterface.hpp"
#include "data/DBHelpers.hpp"
#include "data/Types.hpp"
#include "data/cassandra/Concepts.hpp"
#include "data/cassandra/Handle.hpp"
#include "data/cassandra/Schema.hpp"
#include "data/cassandra/SettingsProvider.hpp"
@@ -193,7 +194,7 @@ public:
// wait for other threads to finish their writes
executor_.sync();
if (!range) {
if (!range_) {
executor_.writeSync(schema_->updateLedgerRange, ledgerSequence_, false, ledgerSequence_);
}
@@ -649,7 +650,7 @@ public:
{
if (auto const res = executor_.read(yield, schema_->selectSuccessor, key, ledgerSequence); res) {
if (auto const result = res->template get<ripple::uint256>(); result) {
if (*result == lastKey)
if (*result == kLAST_KEY)
return std::nullopt;
return result;
}
@@ -862,7 +863,7 @@ public:
{
LOG(log_.trace()) << " Writing ledger object " << key.size() << ":" << seq << " [" << blob.size() << " bytes]";
if (range)
if (range_)
executor_.write(schema_->insertDiff, seq, key);
executor_.write(schema_->insertObject, std::move(key), seq, std::move(blob));

View File

@@ -190,11 +190,11 @@ template <typename T>
inline bool
isOffer(T const& object)
{
static constexpr short OFFER_OFFSET = 0x006f;
static constexpr short SHIFT = 8;
static constexpr short kOFFER_OFFSET = 0x006f;
static constexpr short kSHIFT = 8;
short offer_bytes = (object[1] << SHIFT) | object[2];
return offer_bytes == OFFER_OFFSET;
short offerBytes = (object[1] << kSHIFT) | object[2];
return offerBytes == kOFFER_OFFSET;
}
/**
@@ -223,9 +223,9 @@ template <typename T>
inline bool
isDirNode(T const& object)
{
static constexpr short DIR_NODE_SPACE_KEY = 0x0064;
static constexpr short kDIR_NODE_SPACE_KEY = 0x0064;
short const spaceKey = (object.data()[1] << 8) | object.data()[2];
return spaceKey == DIR_NODE_SPACE_KEY;
return spaceKey == kDIR_NODE_SPACE_KEY;
}
/**
@@ -273,12 +273,12 @@ template <typename T>
inline ripple::uint256
getBookBase(T const& key)
{
static constexpr size_t KEY_SIZE = 24;
static constexpr size_t kEY_SIZE = 24;
ASSERT(key.size() == ripple::uint256::size(), "Invalid key size {}", key.size());
ripple::uint256 ret;
for (size_t i = 0; i < KEY_SIZE; ++i)
for (size_t i = 0; i < kEY_SIZE; ++i)
ret.data()[i] = key.data()[i];
return ret;
@@ -297,4 +297,4 @@ uint256ToString(ripple::uint256 const& input)
}
/** @brief The ripple epoch start timestamp. Midnight on 1st January 2000. */
static constexpr std::uint32_t rippleEpochStart = 946684800;
static constexpr std::uint32_t kRIPPLE_EPOCH_START = 946684800;

View File

@@ -266,7 +266,7 @@ struct Amendment {
* @return The amendment Id as uint256
*/
static ripple::uint256
GetAmendmentId(std::string_view const name);
getAmendmentId(std::string_view const name);
/**
* @brief Equality comparison operator
@@ -312,8 +312,8 @@ struct AmendmentKey {
operator<=>(AmendmentKey const& other) const = default;
};
constexpr ripple::uint256 firstKey{"0000000000000000000000000000000000000000000000000000000000000000"};
constexpr ripple::uint256 lastKey{"FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"};
constexpr ripple::uint256 hi192{"0000000000000000000000000000000000000000000000001111111111111111"};
constexpr ripple::uint256 kFIRST_KEY{"0000000000000000000000000000000000000000000000000000000000000000"};
constexpr ripple::uint256 kLAST_KEY{"FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"};
constexpr ripple::uint256 kHI192{"0000000000000000000000000000000000000000000000001111111111111111"};
} // namespace data

View File

@@ -74,7 +74,7 @@ public:
'class': 'SimpleStrategy',
'replication_factor': '{}'
}}
AND durable_writes = true
AND durable_writes = True
)",
settingsProvider_.get().getKeyspace(),
settingsProvider_.get().getReplicationFactor()
@@ -472,7 +472,7 @@ public:
R"(
UPDATE {}
SET sequence = ?
WHERE is_latest = false
WHERE is_latest = False
)",
qualifiedTableName(settingsProvider_.get(), "ledger_range")
));
@@ -776,7 +776,7 @@ public:
R"(
SELECT sequence
FROM {}
WHERE is_latest = true
WHERE is_latest = True
)",
qualifiedTableName(settingsProvider_.get(), "ledger_range")
));
@@ -787,6 +787,7 @@ public:
R"(
SELECT sequence
FROM {}
WHERE is_latest in (True, False)
)",
qualifiedTableName(settingsProvider_.get(), "ledger_range")
));

View File

@@ -100,12 +100,12 @@ SettingsProvider::parseSettings() const
if (config_.getValueView("connect_timeout").hasValue()) {
auto const connectTimeoutSecond = config_.get<uint32_t>("connect_timeout");
settings.connectionTimeout = std::chrono::milliseconds{connectTimeoutSecond * util::MILLISECONDS_PER_SECOND};
settings.connectionTimeout = std::chrono::milliseconds{connectTimeoutSecond * util::kMILLISECONDS_PER_SECOND};
}
if (config_.getValueView("request_timeout").hasValue()) {
auto const requestTimeoutSecond = config_.get<uint32_t>("request_timeout");
settings.requestTimeout = std::chrono::milliseconds{requestTimeoutSecond * util::MILLISECONDS_PER_SECOND};
settings.requestTimeout = std::chrono::milliseconds{requestTimeoutSecond * util::kMILLISECONDS_PER_SECOND};
}
settings.certificate = parseOptionalCertificate();

View File

@@ -31,14 +31,14 @@
#include <vector>
namespace {
constexpr auto batchDeleter = [](CassBatch* ptr) { cass_batch_free(ptr); };
constexpr auto kBATCH_DELETER = [](CassBatch* ptr) { cass_batch_free(ptr); };
} // namespace
namespace data::cassandra::impl {
// TODO: Use an appropriate value instead of CASS_BATCH_TYPE_LOGGED for different use cases
Batch::Batch(std::vector<Statement> const& statements)
: ManagedObject{cass_batch_new(CASS_BATCH_TYPE_LOGGED), batchDeleter}
: ManagedObject{cass_batch_new(CASS_BATCH_TYPE_LOGGED), kBATCH_DELETER}
{
cass_batch_set_is_idempotent(*this, cass_true);

View File

@@ -33,13 +33,13 @@
namespace {
constexpr auto clusterDeleter = [](CassCluster* ptr) { cass_cluster_free(ptr); };
constexpr auto kCLUSTER_DELETER = [](CassCluster* ptr) { cass_cluster_free(ptr); };
}; // namespace
namespace data::cassandra::impl {
Cluster::Cluster(Settings const& settings) : ManagedObject{cass_cluster_new(), clusterDeleter}
Cluster::Cluster(Settings const& settings) : ManagedObject{cass_cluster_new(), kCLUSTER_DELETER}
{
using std::to_string;

View File

@@ -25,6 +25,8 @@
#include <cassandra.h>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <optional>
#include <string>
#include <string_view>
@@ -39,10 +41,10 @@ namespace data::cassandra::impl {
* @brief Bundles all cassandra settings in one place.
*/
struct Settings {
static constexpr std::size_t DEFAULT_CONNECTION_TIMEOUT = 10000;
static constexpr uint32_t DEFAULT_MAX_WRITE_REQUESTS_OUTSTANDING = 10'000;
static constexpr uint32_t DEFAULT_MAX_READ_REQUESTS_OUTSTANDING = 100'000;
static constexpr std::size_t DEFAULT_BATCH_SIZE = 20;
static constexpr std::size_t kDEFAULT_CONNECTION_TIMEOUT = 10000;
static constexpr uint32_t kDEFAULT_MAX_WRITE_REQUESTS_OUTSTANDING = 10'000;
static constexpr uint32_t kDEFAULT_MAX_READ_REQUESTS_OUTSTANDING = 100'000;
static constexpr std::size_t kDEFAULT_BATCH_SIZE = 20;
/**
* @brief Represents the configuration of contact points for cassandra.
@@ -63,7 +65,7 @@ struct Settings {
bool enableLog = false;
/** @brief Connect timeout specified in milliseconds */
std::chrono::milliseconds connectionTimeout = std::chrono::milliseconds{DEFAULT_CONNECTION_TIMEOUT};
std::chrono::milliseconds connectionTimeout = std::chrono::milliseconds{kDEFAULT_CONNECTION_TIMEOUT};
/** @brief Request timeout specified in milliseconds */
std::chrono::milliseconds requestTimeout = std::chrono::milliseconds{0}; // no timeout at all
@@ -75,16 +77,16 @@ struct Settings {
uint32_t threads = std::thread::hardware_concurrency();
/** @brief The maximum number of outstanding write requests at any given moment */
uint32_t maxWriteRequestsOutstanding = DEFAULT_MAX_WRITE_REQUESTS_OUTSTANDING;
uint32_t maxWriteRequestsOutstanding = kDEFAULT_MAX_WRITE_REQUESTS_OUTSTANDING;
/** @brief The maximum number of outstanding read requests at any given moment */
uint32_t maxReadRequestsOutstanding = DEFAULT_MAX_READ_REQUESTS_OUTSTANDING;
uint32_t maxReadRequestsOutstanding = kDEFAULT_MAX_READ_REQUESTS_OUTSTANDING;
/** @brief The number of connection per host to always have active */
uint32_t coreConnectionsPerHost = 1u;
/** @brief Size of batches when writing */
std::size_t writeBatchSize = DEFAULT_BATCH_SIZE;
std::size_t writeBatchSize = kDEFAULT_BATCH_SIZE;
/** @brief Size of the IO queue */
std::optional<uint32_t> queueSizeIO = std::nullopt; // NOLINT(readability-redundant-member-init)

View File

@@ -33,7 +33,7 @@
namespace data::cassandra::impl {
class Collection : public ManagedObject<CassCollection> {
static constexpr auto deleter = [](CassCollection* ptr) { cass_collection_free(ptr); };
static constexpr auto kDELETER = [](CassCollection* ptr) { cass_collection_free(ptr); };
static void
throwErrorIfNeeded(CassError const rc, std::string_view const label)
@@ -49,7 +49,7 @@ public:
template <typename Type>
explicit Collection(std::vector<Type> const& value)
: ManagedObject{cass_collection_new(CASS_COLLECTION_TYPE_LIST, value.size()), deleter}
: ManagedObject{cass_collection_new(CASS_COLLECTION_TYPE_LIST, value.size()), kDELETER}
{
bind(value);
}

View File

@@ -32,12 +32,12 @@
#include <utility>
namespace {
constexpr auto futureDeleter = [](CassFuture* ptr) { cass_future_free(ptr); };
constexpr auto kFUTURE_DELETER = [](CassFuture* ptr) { cass_future_free(ptr); };
} // namespace
namespace data::cassandra::impl {
/* implicit */ Future::Future(CassFuture* ptr) : ManagedObject{ptr, futureDeleter}
/* implicit */ Future::Future(CassFuture* ptr) : ManagedObject{ptr, kFUTURE_DELETER}
{
}

View File

@@ -30,8 +30,8 @@ protected:
std::unique_ptr<Managed, void (*)(Managed*)> ptr_;
public:
template <typename deleterCallable>
ManagedObject(Managed* rawPtr, deleterCallable deleter) : ptr_{rawPtr, deleter}
template <typename DeleterCallable>
ManagedObject(Managed* rawPtr, DeleterCallable deleter) : ptr_{rawPtr, deleter}
{
if (rawPtr == nullptr)
throw std::runtime_error("Could not create DB object - got nullptr");

View File

@@ -26,13 +26,13 @@
#include <cstddef>
namespace {
constexpr auto resultDeleter = [](CassResult const* ptr) { cass_result_free(ptr); };
constexpr auto resultIteratorDeleter = [](CassIterator* ptr) { cass_iterator_free(ptr); };
constexpr auto kRESULT_DELETER = [](CassResult const* ptr) { cass_result_free(ptr); };
constexpr auto kRESULT_ITERATOR_DELETER = [](CassIterator* ptr) { cass_iterator_free(ptr); };
} // namespace
namespace data::cassandra::impl {
/* implicit */ Result::Result(CassResult const* ptr) : ManagedObject{ptr, resultDeleter}
/* implicit */ Result::Result(CassResult const* ptr) : ManagedObject{ptr, kRESULT_DELETER}
{
}
@@ -49,7 +49,7 @@ Result::hasRows() const
}
/* implicit */ ResultIterator::ResultIterator(CassIterator* ptr)
: ManagedObject{ptr, resultIteratorDeleter}, hasMore_{cass_iterator_next(ptr) != 0u}
: ManagedObject{ptr, kRESULT_ITERATOR_DELETER}, hasMore_{cass_iterator_next(ptr) != 0u}
{
}

View File

@@ -26,10 +26,10 @@
namespace data::cassandra::impl {
class Session : public ManagedObject<CassSession> {
static constexpr auto deleter = [](CassSession* ptr) { cass_session_free(ptr); };
static constexpr auto kDELETER = [](CassSession* ptr) { cass_session_free(ptr); };
public:
Session() : ManagedObject{cass_session_new(), deleter}
Session() : ManagedObject{cass_session_new(), kDELETER}
{
}
};

View File

@@ -27,12 +27,12 @@
#include <string>
namespace {
constexpr auto contextDeleter = [](CassSsl* ptr) { cass_ssl_free(ptr); };
constexpr auto kCONTEXT_DELETER = [](CassSsl* ptr) { cass_ssl_free(ptr); };
} // namespace
namespace data::cassandra::impl {
SslContext::SslContext(std::string const& certificate) : ManagedObject{cass_ssl_new(), contextDeleter}
SslContext::SslContext(std::string const& certificate) : ManagedObject{cass_ssl_new(), kCONTEXT_DELETER}
{
cass_ssl_set_verify_flags(*this, CASS_SSL_VERIFY_NONE);
if (auto const rc = cass_ssl_add_trusted_cert(*this, certificate.c_str()); rc != CASS_OK) {

View File

@@ -43,7 +43,7 @@
namespace data::cassandra::impl {
class Statement : public ManagedObject<CassStatement> {
static constexpr auto deleter = [](CassStatement* ptr) { cass_statement_free(ptr); };
static constexpr auto kDELETER = [](CassStatement* ptr) { cass_statement_free(ptr); };
public:
/**
@@ -54,14 +54,14 @@ public:
*/
template <typename... Args>
explicit Statement(std::string_view query, Args&&... args)
: ManagedObject{cass_statement_new_n(query.data(), query.size(), sizeof...(args)), deleter}
: ManagedObject{cass_statement_new_n(query.data(), query.size(), sizeof...(args)), kDELETER}
{
cass_statement_set_consistency(*this, CASS_CONSISTENCY_QUORUM);
cass_statement_set_is_idempotent(*this, cass_true);
bind<Args...>(std::forward<Args>(args)...);
}
/* implicit */ Statement(CassStatement* ptr) : ManagedObject{ptr, deleter}
/* implicit */ Statement(CassStatement* ptr) : ManagedObject{ptr, kDELETER}
{
cass_statement_set_consistency(*this, CASS_CONSISTENCY_QUORUM);
cass_statement_set_is_idempotent(*this, cass_true);
@@ -153,10 +153,10 @@ public:
* This is used to produce Statement objects that can be executed.
*/
class PreparedStatement : public ManagedObject<CassPrepared const> {
static constexpr auto deleter = [](CassPrepared const* ptr) { cass_prepared_free(ptr); };
static constexpr auto kDELETER = [](CassPrepared const* ptr) { cass_prepared_free(ptr); };
public:
/* implicit */ PreparedStatement(CassPrepared const* ptr) : ManagedObject{ptr, deleter}
/* implicit */ PreparedStatement(CassPrepared const* ptr) : ManagedObject{ptr, kDELETER}
{
}

View File

@@ -24,17 +24,17 @@
#include <cassandra.h>
namespace {
constexpr auto tupleDeleter = [](CassTuple* ptr) { cass_tuple_free(ptr); };
constexpr auto tupleIteratorDeleter = [](CassIterator* ptr) { cass_iterator_free(ptr); };
constexpr auto kTUPLE_DELETER = [](CassTuple* ptr) { cass_tuple_free(ptr); };
constexpr auto kTUPLE_ITERATOR_DELETER = [](CassIterator* ptr) { cass_iterator_free(ptr); };
} // namespace
namespace data::cassandra::impl {
/* implicit */ Tuple::Tuple(CassTuple* ptr) : ManagedObject{ptr, tupleDeleter}
/* implicit */ Tuple::Tuple(CassTuple* ptr) : ManagedObject{ptr, kTUPLE_DELETER}
{
}
/* implicit */ TupleIterator::TupleIterator(CassIterator* ptr) : ManagedObject{ptr, tupleIteratorDeleter}
/* implicit */ TupleIterator::TupleIterator(CassIterator* ptr) : ManagedObject{ptr, kTUPLE_ITERATOR_DELETER}
{
}

View File

@@ -37,14 +37,14 @@
namespace data::cassandra::impl {
class Tuple : public ManagedObject<CassTuple> {
static constexpr auto deleter = [](CassTuple* ptr) { cass_tuple_free(ptr); };
static constexpr auto kDELETER = [](CassTuple* ptr) { cass_tuple_free(ptr); };
public:
/* implicit */ Tuple(CassTuple* ptr);
template <typename... Types>
explicit Tuple(std::tuple<Types...>&& value)
: ManagedObject{cass_tuple_new(std::tuple_size<std::tuple<Types...>>{}), deleter}
: ManagedObject{cass_tuple_new(std::tuple_size<std::tuple<Types...>>{}), kDELETER}
{
std::apply(std::bind_front(&Tuple::bind<Types...>, this), std::move(value));
}

View File

@@ -69,7 +69,7 @@ public:
std::shared_ptr<BackendInterface> const& backend,
CacheType& cache
)
: backend_{backend}, cache_{cache}, settings_{make_CacheLoaderSettings(config)}, ctx_{settings_.numThreads}
: backend_{backend}, cache_{cache}, settings_{makeCacheLoaderSettings(config)}, ctx_{settings_.numThreads}
{
}

View File

@@ -48,7 +48,7 @@ CacheLoaderSettings::isDisabled() const
}
[[nodiscard]] CacheLoaderSettings
make_CacheLoaderSettings(util::config::ClioConfigDefinition const& config)
makeCacheLoaderSettings(util::config::ClioConfigDefinition const& config)
{
CacheLoaderSettings settings;
settings.numThreads = config.get<uint16_t>("io_threads");

View File

@@ -64,6 +64,6 @@ struct CacheLoaderSettings {
* @returns The CacheLoaderSettings object
*/
[[nodiscard]] CacheLoaderSettings
make_CacheLoaderSettings(util::config::ClioConfigDefinition const& config);
makeCacheLoaderSettings(util::config::ClioConfigDefinition const& config);
} // namespace etl

View File

@@ -88,9 +88,9 @@ ETLService::runETLPipeline(uint32_t startSequence, uint32_t numExtractors)
auto const end = std::chrono::system_clock::now();
auto const lastPublishedSeq = ledgerPublisher_.getLastPublishedSequence();
static constexpr auto NANOSECONDS_PER_SECOND = 1'000'000'000.0;
static constexpr auto kNANOSECONDS_PER_SECOND = 1'000'000'000.0;
LOG(log_.debug()) << "Extracted and wrote " << lastPublishedSeq.value_or(startSequence) - startSequence << " in "
<< ((end - begin).count()) / NANOSECONDS_PER_SECOND;
<< ((end - begin).count()) / kNANOSECONDS_PER_SECOND;
state_.isWriting = false;
@@ -134,7 +134,7 @@ ETLService::monitor()
}
} catch (std::runtime_error const& e) {
LOG(log_.fatal()) << "Failed to load initial ledger: " << e.what();
amendmentBlockHandler_.onAmendmentBlock();
amendmentBlockHandler_.notifyAmendmentBlocked();
return;
}
@@ -168,7 +168,7 @@ ETLService::publishNextSequence(uint32_t nextSequence)
if (auto rng = backend_->hardFetchLedgerRangeNoThrow(); rng && rng->maxSequence >= nextSequence) {
ledgerPublisher_.publish(nextSequence, {});
++nextSequence;
} else if (networkValidatedLedgers_->waitUntilValidatedByNetwork(nextSequence, util::MILLISECONDS_PER_SECOND)) {
} else if (networkValidatedLedgers_->waitUntilValidatedByNetwork(nextSequence, util::kMILLISECONDS_PER_SECOND)) {
LOG(log_.info()) << "Ledger with sequence = " << nextSequence << " has been validated by the network. "
<< "Attempting to find in database and publish";
@@ -178,8 +178,8 @@ ETLService::publishNextSequence(uint32_t nextSequence)
// database after the specified number of attempts. publishLedger()
// waits one second between each attempt to read the ledger from the
// database
constexpr size_t timeoutSeconds = 10;
bool const success = ledgerPublisher_.publish(nextSequence, timeoutSeconds);
constexpr size_t kTIMEOUT_SECONDS = 10;
bool const success = ledgerPublisher_.publish(nextSequence, kTIMEOUT_SECONDS);
if (!success) {
LOG(log_.warn()) << "Failed to publish ledger with sequence = " << nextSequence << " . Beginning ETL";
@@ -233,7 +233,7 @@ ETLService::monitorReadOnly()
// if we can't, wait until it's validated by the network, or 1 second passes, whichever occurs
// first. Even if we don't hear from rippled, if ledgers are being written to the db, we publish
// them.
networkValidatedLedgers_->waitUntilValidatedByNetwork(latestSequence, util::MILLISECONDS_PER_SECOND);
networkValidatedLedgers_->waitUntilValidatedByNetwork(latestSequence, util::kMILLISECONDS_PER_SECOND);
}
}
}

View File

@@ -141,7 +141,7 @@ public:
* @return A shared pointer to a new instance of ETLService
*/
static std::shared_ptr<ETLService>
make_ETLService(
makeETLService(
util::config::ClioConfigDefinition const& config,
boost::asio::io_context& ioc,
std::shared_ptr<BackendInterface> backend,

View File

@@ -59,7 +59,7 @@ using namespace util::config;
namespace etl {
std::shared_ptr<LoadBalancer>
LoadBalancer::make_LoadBalancer(
LoadBalancer::makeLoadBalancer(
ClioConfigDefinition const& config,
boost::asio::io_context& ioc,
std::shared_ptr<BackendInterface> backend,
@@ -235,7 +235,7 @@ LoadBalancer::forwardToRippled(
)
{
if (not request.contains("command"))
return std::unexpected{rpc::ClioError::rpcCOMMAND_IS_MISSING};
return std::unexpected{rpc::ClioError::RpcCommandIsMissing};
auto const cmd = boost::json::value_to<std::string>(request.at("command"));
if (forwardingCache_) {
@@ -249,10 +249,10 @@ LoadBalancer::forwardToRippled(
auto numAttempts = 0u;
auto xUserValue = isAdmin ? ADMIN_FORWARDING_X_USER_VALUE : USER_FORWARDING_X_USER_VALUE;
auto xUserValue = isAdmin ? kADMIN_FORWARDING_X_USER_VALUE : kUSER_FORWARDING_X_USER_VALUE;
std::optional<boost::json::object> response;
rpc::ClioError error = rpc::ClioError::etlCONNECTION_ERROR;
rpc::ClioError error = rpc::ClioError::EtlConnectionError;
while (numAttempts < sources_.size()) {
auto res = sources_[sourceIdx]->forwardToRippled(request, clientIp, xUserValue, yield);
if (res) {

View File

@@ -65,7 +65,7 @@ public:
using OptionalGetLedgerResponseType = std::optional<GetLedgerResponseType>;
private:
static constexpr std::uint32_t DEFAULT_DOWNLOAD_RANGES = 16;
static constexpr std::uint32_t kDEFAULT_DOWNLOAD_RANGES = 16;
util::Logger log_{"ETL"};
// Forwarding cache must be destroyed after sources because sources have a callback to invalidate cache
@@ -75,7 +75,7 @@ private:
std::vector<SourcePtr> sources_;
std::optional<ETLState> etlState_;
std::uint32_t downloadRanges_ =
DEFAULT_DOWNLOAD_RANGES; /*< The number of markers to use when downloading initial ledger */
kDEFAULT_DOWNLOAD_RANGES; /*< The number of markers to use when downloading initial ledger */
// Using mutext instead of atomic_bool because choosing a new source to
// forward messages should be done with a mutual exclusion otherwise there will be a race condition
@@ -85,12 +85,12 @@ public:
/**
* @brief Value for the X-User header when forwarding admin requests
*/
static constexpr std::string_view ADMIN_FORWARDING_X_USER_VALUE = "clio_admin";
static constexpr std::string_view kADMIN_FORWARDING_X_USER_VALUE = "clio_admin";
/**
* @brief Value for the X-User header when forwarding user requests
*/
static constexpr std::string_view USER_FORWARDING_X_USER_VALUE = "clio_user";
static constexpr std::string_view kUSER_FORWARDING_X_USER_VALUE = "clio_user";
/**
* @brief Create an instance of the load balancer.
@@ -108,7 +108,7 @@ public:
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
std::shared_ptr<NetworkValidatedLedgersInterface> validatedLedgers,
SourceFactory sourceFactory = make_Source
SourceFactory sourceFactory = makeSource
);
/**
@@ -123,13 +123,13 @@ public:
* @return A shared pointer to a new instance of LoadBalancer
*/
static std::shared_ptr<LoadBalancer>
make_LoadBalancer(
makeLoadBalancer(
util::config::ClioConfigDefinition const& config,
boost::asio::io_context& ioc,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
std::shared_ptr<NetworkValidatedLedgersInterface> validatedLedgers,
SourceFactory sourceFactory = make_Source
SourceFactory sourceFactory = makeSource
);
~LoadBalancer();

View File

@@ -27,7 +27,7 @@
namespace etl {
std::shared_ptr<NetworkValidatedLedgers>
NetworkValidatedLedgers::make_ValidatedLedgers()
NetworkValidatedLedgers::makeValidatedLedgers()
{
return std::make_shared<NetworkValidatedLedgers>();
}

View File

@@ -51,7 +51,7 @@ public:
* @return A shared pointer to a new instance of NetworkValidatedLedgers
*/
static std::shared_ptr<NetworkValidatedLedgers>
make_ValidatedLedgers();
makeValidatedLedgers();
/**
* @brief Notify the datastructure that idx has been validated by the network.

View File

@@ -38,7 +38,7 @@
namespace etl {
SourcePtr
make_Source(
makeSource(
util::config::ObjectView const& config,
boost::asio::io_context& ioc,
std::shared_ptr<BackendInterface> backend,

View File

@@ -175,7 +175,7 @@ using SourceFactory = std::function<SourcePtr(
* @return The created source
*/
SourcePtr
make_Source(
makeSource(
util::config::ObjectView const& config,
boost::asio::io_context& ioc,
std::shared_ptr<BackendInterface> backend,

View File

@@ -30,8 +30,8 @@
namespace etl::impl {
AmendmentBlockHandler::ActionType const AmendmentBlockHandler::defaultAmendmentBlockAction = []() {
static util::Logger const log{"ETL"};
AmendmentBlockHandler::ActionType const AmendmentBlockHandler::kDEFAULT_AMENDMENT_BLOCK_ACTION = []() {
static util::Logger const log{"ETL"}; // NOLINT(readability-identifier-naming)
LOG(log.fatal()) << "Can't process new ledgers: The current ETL source is not compatible with the version of "
<< "the libxrpl Clio is currently using. Please upgrade Clio to a newer version.";
};
@@ -47,7 +47,7 @@ AmendmentBlockHandler::AmendmentBlockHandler(
}
void
AmendmentBlockHandler::onAmendmentBlock()
AmendmentBlockHandler::notifyAmendmentBlocked()
{
state_.get().isAmendmentBlocked = true;
repeat_.start(interval_, action_);

View File

@@ -43,17 +43,17 @@ private:
ActionType action_;
public:
static ActionType const defaultAmendmentBlockAction;
static ActionType const kDEFAULT_AMENDMENT_BLOCK_ACTION;
AmendmentBlockHandler(
boost::asio::io_context& ioc,
SystemState& state,
std::chrono::steady_clock::duration interval = std::chrono::seconds{1},
ActionType action = defaultAmendmentBlockAction
ActionType action = kDEFAULT_AMENDMENT_BLOCK_ACTION
);
void
onAmendmentBlock();
notifyAmendmentBlocked();
};
} // namespace etl::impl

View File

@@ -58,10 +58,10 @@ public:
}();
rg::sort(accountRoots);
std::vector<ripple::uint256> cursors{data::firstKey};
std::vector<ripple::uint256> cursors{data::kFIRST_KEY};
rg::copy(accountRoots.begin(), accountRoots.end(), std::back_inserter(cursors));
rg::sort(cursors);
cursors.push_back(data::lastKey);
cursors.push_back(data::kLAST_KEY);
std::vector<CursorPair> pairs;
pairs.reserve(cursors.size());

View File

@@ -85,10 +85,10 @@ public:
);
}
std::vector<ripple::uint256> cursors{data::firstKey};
std::vector<ripple::uint256> cursors{data::kFIRST_KEY};
rg::copy(liveCursors | vs::take(std::min(liveCursors.size(), numCursors_)), std::back_inserter(cursors));
rg::sort(cursors);
cursors.push_back(data::lastKey);
cursors.push_back(data::kLAST_KEY);
std::vector<CursorPair> pairs;
pairs.reserve(cursors.size());

View File

@@ -74,14 +74,14 @@ public:
rg::unique(diffs, [](auto const& a, auto const& b) { return a.key == b.key; });
diffs.erase(removalCursor, last);
std::vector<ripple::uint256> cursors{data::firstKey};
std::vector<ripple::uint256> cursors{data::kFIRST_KEY};
rg::copy(
diffs //
| vs::filter([](auto const& obj) { return not obj.blob.empty(); }) //
| vs::transform([](auto const& obj) { return obj.key; }),
std::back_inserter(cursors)
);
cursors.push_back(data::lastKey); // last pair should cover the remaining range
cursors.push_back(data::kLAST_KEY); // last pair should cover the remaining range
std::vector<CursorPair> pairs;
pairs.reserve(cursors.size());

View File

@@ -39,7 +39,7 @@ public:
using DataType = std::optional<RawDataType>;
using QueueType = ThreadSafeQueue<DataType>; // TODO: probably should use boost::lockfree::queue instead?
constexpr static auto TOTAL_MAX_IN_QUEUE = 1000u;
static constexpr auto kTOTAL_MAX_IN_QUEUE = 1000u;
private:
util::Logger log_{"ETL"};
@@ -58,7 +58,7 @@ public:
*/
ExtractionDataPipe(uint32_t stride, uint32_t startSequence) : stride_{stride}, startSequence_{startSequence}
{
auto const maxQueueSize = TOTAL_MAX_IN_QUEUE / stride;
auto const maxQueueSize = kTOTAL_MAX_IN_QUEUE / stride;
for (size_t i = 0; i < stride_; ++i)
queues_.push_back(std::make_unique<QueueType>(maxQueueSize));
}

View File

@@ -44,13 +44,13 @@ ForwardingSource::ForwardingSource(
std::string ip,
std::string wsPort,
std::chrono::steady_clock::duration forwardingTimeout,
std::chrono::steady_clock::duration connectionTimeout
std::chrono::steady_clock::duration connTimeout
)
: log_(fmt::format("ForwardingSource[{}:{}]", ip, wsPort))
, connectionBuilder_(std::move(ip), std::move(wsPort))
, forwardingTimeout_{forwardingTimeout}
{
connectionBuilder_.setConnectionTimeout(connectionTimeout)
connectionBuilder_.setConnectionTimeout(connTimeout)
.addHeader(
{boost::beast::http::field::user_agent, fmt::format("{} websocket-client-coro", BOOST_BEAST_VERSION_STRING)}
);
@@ -76,14 +76,14 @@ ForwardingSource::forwardToRippled(
auto expectedConnection = connectionBuilder.connect(yield);
if (not expectedConnection) {
LOG(log_.debug()) << "Couldn't connect to rippled to forward request.";
return std::unexpected{rpc::ClioError::etlCONNECTION_ERROR};
return std::unexpected{rpc::ClioError::EtlConnectionError};
}
auto& connection = expectedConnection.value();
auto writeError = connection->write(boost::json::serialize(request), yield, forwardingTimeout_);
if (writeError) {
LOG(log_.debug()) << "Error sending request to rippled to forward request.";
return std::unexpected{rpc::ClioError::etlREQUEST_ERROR};
return std::unexpected{rpc::ClioError::EtlRequestError};
}
auto response = connection->read(yield, forwardingTimeout_);
@@ -91,10 +91,10 @@ ForwardingSource::forwardToRippled(
if (auto errorCode = response.error().errorCode();
errorCode.has_value() and errorCode->value() == boost::system::errc::timed_out) {
LOG(log_.debug()) << "Request to rippled timed out";
return std::unexpected{rpc::ClioError::etlREQUEST_TIMEOUT};
return std::unexpected{rpc::ClioError::EtlRequestTimeout};
}
LOG(log_.debug()) << "Error sending request to rippled to forward request.";
return std::unexpected{rpc::ClioError::etlREQUEST_ERROR};
return std::unexpected{rpc::ClioError::EtlRequestError};
}
boost::json::value parsedResponse;
@@ -104,7 +104,7 @@ ForwardingSource::forwardToRippled(
throw std::runtime_error("response is not an object");
} catch (std::exception const& e) {
LOG(log_.debug()) << "Error parsing response from rippled: " << e.what() << ". Response: " << *response;
return std::unexpected{rpc::ClioError::etlINVALID_RESPONSE};
return std::unexpected{rpc::ClioError::EtlInvalidResponse};
}
auto responseObject = parsedResponse.as_object();

View File

@@ -39,14 +39,14 @@ class ForwardingSource {
util::requests::WsConnectionBuilder connectionBuilder_;
std::chrono::steady_clock::duration forwardingTimeout_;
static constexpr std::chrono::seconds CONNECTION_TIMEOUT{3};
static constexpr std::chrono::seconds kCONNECTION_TIMEOUT{3};
public:
ForwardingSource(
std::string ip,
std::string wsPort,
std::chrono::steady_clock::duration forwardingTimeout,
std::chrono::steady_clock::duration connectionTimeout = CONNECTION_TIMEOUT
std::chrono::steady_clock::duration connTimeout = ForwardingSource::kCONNECTION_TIMEOUT
);
/**

View File

@@ -131,8 +131,8 @@ public:
result.mptHoldersData.push_back(*maybeMPTHolder);
result.accountTxData.emplace_back(txMeta, sttx.getTransactionID());
static constexpr std::size_t KEY_SIZE = 32;
std::string keyStr{reinterpret_cast<char const*>(sttx.getTransactionID().data()), KEY_SIZE};
static constexpr std::size_t kEY_SIZE = 32;
std::string keyStr{reinterpret_cast<char const*>(sttx.getTransactionID().data()), kEY_SIZE};
backend_->writeTransaction(
std::move(keyStr),
ledger.seq,
@@ -204,10 +204,10 @@ public:
backend_->writeSuccessor(std::move(key), sequence, uint256ToString(succ->key));
}
ripple::uint256 prev = data::firstKey;
ripple::uint256 prev = data::kFIRST_KEY;
while (auto cur = backend_->cache().getSuccessor(prev, sequence)) {
ASSERT(cur.has_value(), "Succesor for key {} must exist", ripple::strHex(prev));
if (prev == data::firstKey)
if (prev == data::kFIRST_KEY)
backend_->writeSuccessor(uint256ToString(prev), sequence, uint256ToString(cur->key));
if (isBookDir(cur->key, cur->blob)) {
@@ -228,12 +228,12 @@ public:
}
prev = cur->key;
static constexpr std::size_t LOG_INTERVAL = 100000;
if (numWrites % LOG_INTERVAL == 0 && numWrites != 0)
static constexpr std::size_t kLOG_INTERVAL = 100000;
if (numWrites % kLOG_INTERVAL == 0 && numWrites != 0)
LOG(log_.info()) << "Wrote " << numWrites << " book successors";
}
backend_->writeSuccessor(uint256ToString(prev), sequence, uint256ToString(data::lastKey));
backend_->writeSuccessor(uint256ToString(prev), sequence, uint256ToString(data::kLAST_KEY));
++numWrites;
});

View File

@@ -184,8 +184,8 @@ public:
// if the ledger closed over MAX_LEDGER_AGE_SECONDS ago, assume we are still catching up and don't publish
// TODO: this probably should be a strategy
static constexpr std::uint32_t MAX_LEDGER_AGE_SECONDS = 600;
if (age < MAX_LEDGER_AGE_SECONDS) {
static constexpr std::uint32_t kMAX_LEDGER_AGE_SECONDS = 600;
if (age < kMAX_LEDGER_AGE_SECONDS) {
std::optional<ripple::Fees> fees = data::synchronousAndRetryOnTimeout([&](auto yield) {
return backend_->fetchFees(lgrInfo.seq, yield);
});
@@ -260,9 +260,9 @@ public:
auto now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
.count();
auto closeTime = lastCloseTime_.time_since_epoch().count();
if (now < (rippleEpochStart + closeTime))
if (now < (kRIPPLE_EPOCH_START + closeTime))
return 0;
return now - (rippleEpochStart + closeTime);
return now - (kRIPPLE_EPOCH_START + closeTime);
}
/**

View File

@@ -77,7 +77,7 @@ SubscriptionSource::SubscriptionSource(
, subscriptions_(std::move(subscriptions))
, strand_(boost::asio::make_strand(ioContext))
, wsTimeout_(wsTimeout)
, retry_(util::makeRetryExponentialBackoff(retryDelay, RETRY_MAX_DELAY, strand_))
, retry_(util::makeRetryExponentialBackoff(retryDelay, kRETRY_MAX_DELAY, strand_))
, onConnect_(std::move(onConnect))
, onDisconnect_(std::move(onDisconnect))
, onLedgerClosed_(std::move(onLedgerClosed))
@@ -222,9 +222,9 @@ SubscriptionSource::handleMessage(std::string const& message)
auto const object = raw.as_object();
uint32_t ledgerIndex = 0;
static constexpr char const* const JS_LedgerClosed = "ledgerClosed";
static constexpr char const* const JS_ValidationReceived = "validationReceived";
static constexpr char const* const JS_ManifestReceived = "manifestReceived";
static constexpr auto kJS_LEDGER_CLOSED = "ledgerClosed";
static constexpr auto kJS_VALIDATION_RECEIVED = "validationReceived";
static constexpr auto kJS_MANIFEST_RECEIVED = "manifestReceived";
if (object.contains(JS(result))) {
auto const& result = object.at(JS(result)).as_object();
@@ -237,7 +237,7 @@ SubscriptionSource::handleMessage(std::string const& message)
}
LOG(log_.debug()) << "Received a message on ledger subscription stream. Message: " << object;
} else if (object.contains(JS(type)) && object.at(JS(type)) == JS_LedgerClosed) {
} else if (object.contains(JS(type)) && object.at(JS(type)) == kJS_LEDGER_CLOSED) {
LOG(log_.debug()) << "Received a message of type 'ledgerClosed' on ledger subscription stream. Message: "
<< object;
if (object.contains(JS(ledger_index))) {
@@ -259,10 +259,10 @@ SubscriptionSource::handleMessage(std::string const& message)
if (object.contains(JS(transaction)) and !object.contains(JS(meta))) {
LOG(log_.debug()) << "Forwarding proposed transaction: " << object;
subscriptions_->forwardProposedTransaction(object);
} else if (object.contains(JS(type)) && object.at(JS(type)) == JS_ValidationReceived) {
} else if (object.contains(JS(type)) && object.at(JS(type)) == kJS_VALIDATION_RECEIVED) {
LOG(log_.debug()) << "Forwarding validation: " << object;
subscriptions_->forwardValidation(object);
} else if (object.contains(JS(type)) && object.at(JS(type)) == JS_ManifestReceived) {
} else if (object.contains(JS(type)) && object.at(JS(type)) == kJS_MANIFEST_RECEIVED) {
LOG(log_.debug()) << "Forwarding manifest: " << object;
subscriptions_->forwardManifest(object);
}
@@ -363,12 +363,12 @@ SubscriptionSource::setValidatedRange(std::string range)
std::string const&
SubscriptionSource::getSubscribeCommandJson()
{
static boost::json::object const jsonValue{
static boost::json::object const kJSON_VALUE{
{"command", "subscribe"},
{"streams", {"ledger", "manifests", "validations", "transactions_proposed"}},
};
static std::string const jsonString = boost::json::serialize(jsonValue);
return jsonString;
static std::string const kJSON_STRING = boost::json::serialize(kJSON_VALUE);
return kJSON_STRING;
}
} // namespace etl::impl

View File

@@ -91,9 +91,9 @@ private:
std::future<void> runFuture_;
static constexpr std::chrono::seconds WS_TIMEOUT{30};
static constexpr std::chrono::seconds RETRY_MAX_DELAY{30};
static constexpr std::chrono::seconds RETRY_DELAY{1};
static constexpr std::chrono::seconds kWS_TIMEOUT{30};
static constexpr std::chrono::seconds kRETRY_MAX_DELAY{30};
static constexpr std::chrono::seconds kRETRY_DELAY{1};
public:
/**
@@ -107,8 +107,7 @@ public:
* @param subscriptions The subscription manager object
* @param onConnect The onConnect hook. Called when the connection is established
* @param onDisconnect The onDisconnect hook. Called when the connection is lost
* @param onLedgerClosed The onLedgerClosed hook. Called when the ledger is closed but only if the source is
* forwarding
* @param onLedgerClosed The onLedgerClosed hook. Called when the ledger is closed if the source is forwarding
* @param wsTimeout A timeout for websocket operations. Defaults to 30 seconds
* @param retryDelay The retry delay. Defaults to 1 second
*/
@@ -121,8 +120,8 @@ public:
OnConnectHook onConnect,
OnDisconnectHook onDisconnect,
OnLedgerClosedHook onLedgerClosed,
std::chrono::steady_clock::duration const wsTimeout = WS_TIMEOUT,
std::chrono::steady_clock::duration const retryDelay = RETRY_DELAY
std::chrono::steady_clock::duration const wsTimeout = SubscriptionSource::kWS_TIMEOUT,
std::chrono::steady_clock::duration const retryDelay = SubscriptionSource::kRETRY_DELAY
);
/**

View File

@@ -203,7 +203,7 @@ private:
} catch (std::runtime_error const& e) {
LOG(log_.fatal()) << "Failed to build next ledger: " << e.what();
amendmentBlockHandler_.get().onAmendmentBlock();
amendmentBlockHandler_.get().notifyAmendmentBlocked();
return {ripple::LedgerHeader{}, false};
}
@@ -308,11 +308,11 @@ private:
auto lb = backend_->cache().getPredecessor(obj.key, lgrInfo.seq);
if (!lb)
lb = {.key = data::firstKey, .blob = {}};
lb = {.key = data::kFIRST_KEY, .blob = {}};
auto ub = backend_->cache().getSuccessor(obj.key, lgrInfo.seq);
if (!ub)
ub = {.key = data::lastKey, .blob = {}};
ub = {.key = data::kLAST_KEY, .blob = {}};
if (obj.blob.empty()) {
LOG(log_.debug()) << "writing successor for deleted object " << ripple::strHex(obj.key) << " - "
@@ -336,10 +336,10 @@ private:
LOG(log_.debug()) << "Updating book successor " << ripple::strHex(base) << " - "
<< ripple::strHex(succ->key);
} else {
backend_->writeSuccessor(uint256ToString(base), lgrInfo.seq, uint256ToString(data::lastKey));
backend_->writeSuccessor(uint256ToString(base), lgrInfo.seq, uint256ToString(data::kLAST_KEY));
LOG(log_.debug()) << "Updating book successor " << ripple::strHex(base) << " - "
<< ripple::strHex(data::lastKey);
<< ripple::strHex(data::kLAST_KEY);
}
}
}
@@ -361,7 +361,7 @@ private:
for (auto& obj : *(rawData.mutable_book_successors())) {
auto firstBook = std::move(*obj.mutable_first_book());
if (!firstBook.size())
firstBook = uint256ToString(data::lastKey);
firstBook = uint256ToString(data::kLAST_KEY);
LOG(log_.debug()) << "writing book successor " << ripple::strHex(obj.book_base()) << " - "
<< ripple::strHex(firstBook);
@@ -372,10 +372,10 @@ private:
if (obj.mod_type() != RawLedgerObjectType::MODIFIED) {
std::string* predPtr = obj.mutable_predecessor();
if (predPtr->empty())
*predPtr = uint256ToString(data::firstKey);
*predPtr = uint256ToString(data::kFIRST_KEY);
std::string* succPtr = obj.mutable_successor();
if (succPtr->empty())
*succPtr = uint256ToString(data::lastKey);
*succPtr = uint256ToString(data::kLAST_KEY);
if (obj.mod_type() == RawLedgerObjectType::DELETED) {
LOG(log_.debug()) << "Modifying successors for deleted object " << ripple::strHex(obj.key())

View File

@@ -0,0 +1,37 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
namespace etlng {
/**
* @brief The interface of a handler for amendment blocking
*/
struct AmendmentBlockHandlerInterface {
virtual ~AmendmentBlockHandlerInterface() = default;
/**
* @brief The function to call once an amendment block has been discovered
*/
virtual void
notifyAmendmentBlocked() = 0;
};
} // namespace etlng

View File

@@ -1,5 +1,8 @@
add_library(clio_etlng)
target_sources(clio_etlng PRIVATE impl/AsyncGrpcCall.cpp impl/Extraction.cpp impl/GrpcSource.cpp)
target_sources(
clio_etlng PRIVATE impl/AmendmentBlockHandler.cpp impl/AsyncGrpcCall.cpp impl/Extraction.cpp impl/GrpcSource.cpp
impl/Loading.cpp
)
target_link_libraries(clio_etlng PUBLIC clio_data)

View File

@@ -0,0 +1,134 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
/** @file */
#pragma once
#include "etl/ETLState.hpp"
#include "etlng/InitialLoadObserverInterface.hpp"
#include "rpc/Errors.hpp"
#include <boost/asio/spawn.hpp>
#include <boost/json/object.hpp>
#include <boost/json/value.hpp>
#include <org/xrpl/rpc/v1/ledger.pb.h>
#include <xrpl/proto/org/xrpl/rpc/v1/get_ledger.pb.h>
#include <chrono>
#include <cstdint>
#include <expected>
#include <optional>
#include <string>
#include <vector>
namespace etlng {
/**
* @brief An interface for LoadBalancer
*/
class LoadBalancerInterface {
public:
using RawLedgerObjectType = org::xrpl::rpc::v1::RawLedgerObject;
using GetLedgerResponseType = org::xrpl::rpc::v1::GetLedgerResponse;
using OptionalGetLedgerResponseType = std::optional<GetLedgerResponseType>;
virtual ~LoadBalancerInterface() = default;
/**
* @brief Load the initial ledger, writing data to the queue.
* @note This function will retry indefinitely until the ledger is downloaded.
*
* @param sequence Sequence of ledger to download
* @param loader InitialLoadObserverInterface implementation
* @param retryAfter Time to wait between retries (2 seconds by default)
* @return A std::vector<std::string> The ledger data
*/
virtual std::vector<std::string>
loadInitialLedger(
uint32_t sequence,
etlng::InitialLoadObserverInterface& loader,
std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2}
) = 0;
/**
* @brief Load the initial ledger, writing data to the queue.
* @note This function will retry indefinitely until the ledger is downloaded.
*
* @param sequence Sequence of ledger to download
* @param retryAfter Time to wait between retries (2 seconds by default)
* @return A std::vector<std::string> The ledger data
*/
virtual std::vector<std::string>
loadInitialLedger(uint32_t sequence, std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2}) = 0;
/**
* @brief Fetch data for a specific ledger.
*
* This function will continuously try to fetch data for the specified ledger until the fetch succeeds, the ledger
* is found in the database, or the server is shutting down.
*
* @param ledgerSequence Sequence of the ledger to fetch
* @param getObjects Whether to get the account state diff between this ledger and the prior one
* @param getObjectNeighbors Whether to request object neighbors
* @param retryAfter Time to wait between retries (2 seconds by default)
* @return The extracted data, if extraction was successful. If the ledger was found
* in the database or the server is shutting down, the optional will be empty
*/
virtual OptionalGetLedgerResponseType
fetchLedger(
uint32_t ledgerSequence,
bool getObjects,
bool getObjectNeighbors,
std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2}
) = 0;
/**
* @brief Represent the state of this load balancer as a JSON object
*
* @return JSON representation of the state of this load balancer.
*/
virtual boost::json::value
toJson() const = 0;
/**
* @brief Forward a JSON RPC request to a randomly selected rippled node.
*
* @param request JSON-RPC request to forward
* @param clientIp The IP address of the peer, if known
* @param isAdmin Whether the request is from an admin
* @param yield The coroutine context
* @return Response received from rippled node as JSON object on success or error on failure
*/
virtual std::expected<boost::json::object, rpc::ClioError>
forwardToRippled(
boost::json::object const& request,
std::optional<std::string> const& clientIp,
bool isAdmin,
boost::asio::yield_context yield
) = 0;
/**
* @brief Return state of ETL nodes.
* @return ETL state, nullopt if etl nodes not available
*/
virtual std::optional<etl::ETLState>
getETLState() noexcept = 0;
};
} // namespace etlng

View File

@@ -0,0 +1,52 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "etlng/Models.hpp"
#include <xrpl/protocol/LedgerHeader.h>
#include <optional>
namespace etlng {
/**
* @brief An interface for a ETL Loader
*/
struct LoaderInterface {
virtual ~LoaderInterface() = default;
/**
* @brief Load ledger data
* @param data The data to load
*/
virtual void
load(model::LedgerData const& data) = 0;
/**
* @brief Load the initial ledger
* @param data The data to load
* @return Optional ledger header
*/
virtual std::optional<ripple::LedgerHeader>
loadInitialLedger(model::LedgerData const& data) = 0;
};
} // namespace etlng

View File

@@ -29,6 +29,7 @@
#include <xrpl/proto/org/xrpl/rpc/v1/ledger.pb.h>
#include <xrpl/protocol/LedgerHeader.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/Serializer.h>
#include <xrpl/protocol/TxFormats.h>
#include <xrpl/protocol/TxMeta.h>
@@ -49,7 +50,7 @@ namespace etlng::model {
template <ripple::TxType... Types>
requires(util::hasNoDuplicates(Types...))
struct Spec {
static constexpr bool SpecTag = true;
static constexpr bool kSPEC_TAG = true;
/**
* @brief Checks if the transaction type was requested.
@@ -57,7 +58,7 @@ struct Spec {
* @param type The transaction type
* @return true if the transaction was requested; false otherwise
*/
[[nodiscard]] constexpr static bool
[[nodiscard]] static constexpr bool
wants(ripple::TxType type) noexcept
{
return ((Types == type) || ...);
@@ -79,6 +80,23 @@ struct Transaction {
ripple::uint256 id;
std::string key; // key is the above id as a string of 32 characters
ripple::TxType type;
/**
* @brief Compares Transaction objects to each other without considering sttx and meta fields
* @param other The Transaction to compare to
* @return true if transaction is equivalent; false otherwise
*/
bool
operator==(Transaction const& other) const
{
return raw == other.raw //
and metaRaw == other.metaRaw //
and sttx.getTransactionID() == other.sttx.getTransactionID() //
and meta.getTxID() == other.meta.getTxID() //
and id == other.id //
and key == other.key //
and type == other.type;
}
};
/**
@@ -103,6 +121,9 @@ struct Object {
std::string predecessor;
ModType type;
bool
operator==(Object const&) const = default;
};
/**
@@ -111,6 +132,9 @@ struct Object {
struct BookSuccessor {
std::string firstBook;
std::string bookBase;
bool
operator==(BookSuccessor const&) const = default;
};
/**
@@ -125,6 +149,45 @@ struct LedgerData {
ripple::LedgerHeader header;
std::string rawHeader;
uint32_t seq;
/**
* @brief Compares LedgerData objects to each other without considering the header field
* @param other The LedgerData to compare to
* @return true if data is equivalent; false otherwise
*/
bool
operator==(LedgerData const& other) const
{
auto const serialized = [](auto const& hdr) {
ripple::Serializer ser;
ripple::addRaw(hdr, ser);
return ser.getString();
};
return transactions == other.transactions //
and objects == other.objects //
and successors == other.successors //
and edgeKeys == other.edgeKeys //
and serialized(header) == serialized(other.header) //
and rawHeader == other.rawHeader //
and seq == other.seq;
}
};
/**
* @brief Represents a task for the extractors
*/
struct Task {
/**
* @brief Represents the priority of the task
*/
enum class Priority : uint8_t {
Lower = 0u,
Higher = 1u,
};
Priority priority;
uint32_t seq;
};
} // namespace etlng::model

View File

@@ -0,0 +1,42 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "etlng/Models.hpp"
#include <optional>
namespace etlng {
/**
* @brief The interface of a scheduler for the extraction proccess
*/
struct SchedulerInterface {
virtual ~SchedulerInterface() = default;
/**
* @brief Attempt to obtain the next task
* @return A task if one exists; std::nullopt otherwise
*/
[[nodiscard]] virtual std::optional<model::Task>
next() = 0;
};
} // namespace etlng

View File

@@ -0,0 +1,56 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "etlng/impl/AmendmentBlockHandler.hpp"
#include "etl/SystemState.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include "util/log/Logger.hpp"
#include <chrono>
#include <functional>
#include <utility>
namespace etlng::impl {
AmendmentBlockHandler::ActionType const AmendmentBlockHandler::kDEFAULT_AMENDMENT_BLOCK_ACTION = []() {
static util::Logger const log{"ETL"}; // NOLINT(readability-identifier-naming)
LOG(log.fatal()) << "Can't process new ledgers: The current ETL source is not compatible with the version of "
<< "the libxrpl Clio is currently using. Please upgrade Clio to a newer version.";
};
AmendmentBlockHandler::AmendmentBlockHandler(
util::async::AnyExecutionContext&& ctx,
etl::SystemState& state,
std::chrono::steady_clock::duration interval,
ActionType action
)
: state_{std::ref(state)}, interval_{interval}, ctx_{std::move(ctx)}, action_{std::move(action)}
{
}
void
AmendmentBlockHandler::notifyAmendmentBlocked()
{
state_.get().isAmendmentBlocked = true;
if (not operation_.has_value())
operation_.emplace(ctx_.executeRepeatedly(interval_, action_));
}
} // namespace etlng::impl

View File

@@ -0,0 +1,69 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "etl/SystemState.hpp"
#include "etlng/AmendmentBlockHandlerInterface.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include "util/async/AnyOperation.hpp"
#include <boost/asio/io_context.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/steady_timer.hpp>
#include <chrono>
#include <functional>
#include <optional>
namespace etlng::impl {
class AmendmentBlockHandler : public AmendmentBlockHandlerInterface {
public:
using ActionType = std::function<void()>;
private:
std::reference_wrapper<etl::SystemState> state_;
std::chrono::steady_clock::duration interval_;
util::async::AnyExecutionContext ctx_;
std::optional<util::async::AnyOperation<void>> operation_;
ActionType action_;
public:
static ActionType const kDEFAULT_AMENDMENT_BLOCK_ACTION;
AmendmentBlockHandler(
util::async::AnyExecutionContext&& ctx,
etl::SystemState& state,
std::chrono::steady_clock::duration interval = std::chrono::seconds{1},
ActionType action = kDEFAULT_AMENDMENT_BLOCK_ACTION
);
~AmendmentBlockHandler() override
{
if (operation_.has_value())
operation_.value().abort();
}
void
notifyAmendmentBlocked() override;
};
} // namespace etlng::impl

View File

@@ -87,14 +87,14 @@ AsyncGrpcCall::process(
if (abort) {
LOG(log_.error()) << "AsyncGrpcCall aborted";
return CallStatus::ERRORED;
return CallStatus::Errored;
}
if (!status_.ok()) {
LOG(log_.error()) << "AsyncGrpcCall status_ not ok: code = " << status_.error_code()
<< " message = " << status_.error_message();
return CallStatus::ERRORED;
return CallStatus::Errored;
}
if (!next_->is_unlimited()) {
@@ -141,7 +141,7 @@ AsyncGrpcCall::process(
predecessorKey_ = lastKey_; // but for ongoing onInitialObjects calls we need to pass along the key we left
// off at so that we can link the two lists correctly
return more ? CallStatus::MORE : CallStatus::DONE;
return more ? CallStatus::More : CallStatus::Done;
}
void

View File

@@ -38,7 +38,7 @@ namespace etlng::impl {
class AsyncGrpcCall {
public:
enum class CallStatus { MORE, DONE, ERRORED };
enum class CallStatus { More, Done, Errored };
using RequestType = org::xrpl::rpc::v1::GetLedgerDataRequest;
using ResponseType = org::xrpl::rpc::v1::GetLedgerDataResponse;
using StubType = org::xrpl::rpc::v1::XRPLedgerAPIService::Stub;

View File

@@ -118,8 +118,8 @@ extractObj(PBObjType obj)
.keyRaw = std::move(*obj.mutable_key()),
.data = {obj.mutable_data()->begin(), obj.mutable_data()->end()},
.dataRaw = std::move(*obj.mutable_data()),
.successor = valueOr(obj.successor(), uint256ToString(data::firstKey)),
.predecessor = valueOr(obj.predecessor(), uint256ToString(data::lastKey)),
.successor = valueOr(obj.successor(), uint256ToString(data::kFIRST_KEY)),
.predecessor = valueOr(obj.predecessor(), uint256ToString(data::kLAST_KEY)),
.type = extractModType(obj.mod_type()),
};
}

View File

@@ -139,7 +139,7 @@ GrpcSource::loadInitialLedger(
LOG(log_.trace()) << "Marker prefix = " << ptr->getMarkerPrefix();
auto result = ptr->process(stub_, queue, observer, abort);
if (result != AsyncGrpcCall::CallStatus::MORE) {
if (result != AsyncGrpcCall::CallStatus::More) {
++numFinished;
LOG(log_.debug()) << "Finished a marker. Current number of finished = " << numFinished;
@@ -147,7 +147,7 @@ GrpcSource::loadInitialLedger(
edgeKeys.push_back(std::move(lastKey));
}
if (result == AsyncGrpcCall::CallStatus::ERRORED)
if (result == AsyncGrpcCall::CallStatus::Errored)
abort = true;
}

111
src/etlng/impl/Loading.cpp Normal file
View File

@@ -0,0 +1,111 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "etlng/impl/Loading.hpp"
#include "data/BackendInterface.hpp"
#include "etl/LedgerFetcherInterface.hpp"
#include "etl/impl/LedgerLoader.hpp"
#include "etlng/AmendmentBlockHandlerInterface.hpp"
#include "etlng/Models.hpp"
#include "etlng/RegistryInterface.hpp"
#include "util/Assert.hpp"
#include "util/LedgerUtils.hpp"
#include "util/Profiler.hpp"
#include "util/log/Logger.hpp"
#include <xrpl/protocol/LedgerHeader.h>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <optional>
#include <stdexcept>
#include <string>
#include <utility>
#include <vector>
namespace etlng::impl {
Loader::Loader(
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<etl::LedgerFetcherInterface> fetcher,
std::shared_ptr<RegistryInterface> registry,
std::shared_ptr<AmendmentBlockHandlerInterface> amendmentBlockHandler
)
: backend_(std::move(backend))
, fetcher_(std::move(fetcher))
, registry_(std::move(registry))
, amendmentBlockHandler_(std::move(amendmentBlockHandler))
{
}
void
Loader::load(model::LedgerData const& data)
{
try {
// perform cache updates and all writes from extensions
registry_->dispatch(data);
auto [success, duration] =
::util::timed<std::chrono::duration<double>>([&]() { return backend_->finishWrites(data.seq); });
LOG(log_.info()) << "Finished writes to DB for " << data.seq << ": " << (success ? "YES" : "NO") << "; took "
<< duration;
} catch (std::runtime_error const& e) {
LOG(log_.fatal()) << "Failed to load " << data.seq << ": " << e.what();
amendmentBlockHandler_->notifyAmendmentBlocked();
}
};
void
Loader::onInitialLoadGotMoreObjects(
uint32_t seq,
std::vector<model::Object> const& data,
std::optional<std::string> lastKey
)
{
LOG(log_.debug()) << "On initial load: got more objects for seq " << seq << ". size = " << data.size();
registry_->dispatchInitialObjects(
seq, data, std::move(lastKey).value_or(std::string{}) // TODO: perhaps use optional all the way to extensions?
);
}
std::optional<ripple::LedgerHeader>
Loader::loadInitialLedger(model::LedgerData const& data)
{
// check that database is actually empty
auto rng = backend_->hardFetchLedgerRangeNoThrow();
if (rng) {
ASSERT(false, "Database is not empty");
return std::nullopt;
}
LOG(log_.debug()) << "Deserialized ledger header. " << ::util::toString(data.header);
auto seconds = ::util::timed<std::chrono::seconds>([this, &data]() { registry_->dispatchInitialData(data); });
LOG(log_.info()) << "Dispatching initial data and submitting all writes took " << seconds << " seconds.";
backend_->finishWrites(data.seq);
LOG(log_.debug()) << "Loaded initial ledger";
return {data.header};
}
} // namespace etlng::impl

View File

@@ -0,0 +1,85 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "data/BackendInterface.hpp"
#include "etl/LedgerFetcherInterface.hpp"
#include "etl/impl/LedgerLoader.hpp"
#include "etlng/AmendmentBlockHandlerInterface.hpp"
#include "etlng/InitialLoadObserverInterface.hpp"
#include "etlng/LoaderInterface.hpp"
#include "etlng/Models.hpp"
#include "etlng/RegistryInterface.hpp"
#include "util/log/Logger.hpp"
#include <org/xrpl/rpc/v1/ledger.pb.h>
#include <xrpl/basics/Slice.h>
#include <xrpl/basics/base_uint.h>
#include <xrpl/basics/strHex.h>
#include <xrpl/proto/org/xrpl/rpc/v1/get_ledger.pb.h>
#include <xrpl/protocol/LedgerHeader.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/Serializer.h>
#include <xrpl/protocol/TxMeta.h>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <optional>
#include <string>
#include <vector>
namespace etlng::impl {
class Loader : public LoaderInterface, public InitialLoadObserverInterface {
std::shared_ptr<BackendInterface> backend_;
std::shared_ptr<etl::LedgerFetcherInterface> fetcher_;
std::shared_ptr<RegistryInterface> registry_;
std::shared_ptr<AmendmentBlockHandlerInterface> amendmentBlockHandler_;
util::Logger log_{"ETL"};
public:
using RawLedgerObjectType = org::xrpl::rpc::v1::RawLedgerObject;
using GetLedgerResponseType = org::xrpl::rpc::v1::GetLedgerResponse;
using OptionalGetLedgerResponseType = std::optional<GetLedgerResponseType>;
Loader(
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<etl::LedgerFetcherInterface> fetcher,
std::shared_ptr<RegistryInterface> registry,
std::shared_ptr<AmendmentBlockHandlerInterface> amendmentBlockHandler
);
void
load(model::LedgerData const& data) override;
void
onInitialLoadGotMoreObjects(
uint32_t seq,
std::vector<model::Object> const& data,
std::optional<std::string> lastKey
) override;
std::optional<ripple::LedgerHeader>
loadInitialLedger(model::LedgerData const& data) override;
};
} // namespace etlng::impl

View File

@@ -72,7 +72,7 @@ concept HasInitialObjectHook = requires(T p) {
};
template <typename T>
concept ContainsSpec = std::decay_t<T>::spec::SpecTag;
concept ContainsSpec = std::decay_t<T>::spec::kSPEC_TAG;
template <typename T>
concept ContainsValidHook = HasLedgerDataHook<T> or HasInitialDataHook<T> or

View File

@@ -0,0 +1,151 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "etl/NetworkValidatedLedgersInterface.hpp"
#include "etlng/Models.hpp"
#include "etlng/SchedulerInterface.hpp"
#include <sys/types.h>
#include <atomic>
#include <cstdint>
#include <functional>
#include <limits>
#include <memory>
#include <optional>
#include <tuple>
#include <type_traits>
#include <utility>
namespace etlng::impl {
template <typename T>
concept SomeScheduler = std::is_base_of_v<SchedulerInterface, std::decay_t<T>>;
class ForwardScheduler : public SchedulerInterface {
std::reference_wrapper<etl::NetworkValidatedLedgersInterface> ledgers_;
uint32_t startSeq_;
std::optional<uint32_t> maxSeq_;
std::atomic_uint32_t seq_;
public:
ForwardScheduler(ForwardScheduler const& other)
: ledgers_(other.ledgers_), startSeq_(other.startSeq_), maxSeq_(other.maxSeq_), seq_(other.seq_.load())
{
}
ForwardScheduler(
std::reference_wrapper<etl::NetworkValidatedLedgersInterface> ledgers,
uint32_t startSeq,
std::optional<uint32_t> maxSeq = std::nullopt
)
: ledgers_(ledgers), startSeq_(startSeq), maxSeq_(maxSeq), seq_(startSeq)
{
}
[[nodiscard]] std::optional<model::Task>
next() override
{
static constexpr auto kMAX = std::numeric_limits<uint32_t>::max();
uint32_t currentSeq = seq_;
if (ledgers_.get().getMostRecent() >= currentSeq) {
while (currentSeq < maxSeq_.value_or(kMAX)) {
if (seq_.compare_exchange_weak(currentSeq, currentSeq + 1u, std::memory_order_acq_rel)) {
return {{.priority = model::Task::Priority::Higher, .seq = currentSeq}};
}
}
}
return std::nullopt;
}
};
class BackfillScheduler : public SchedulerInterface {
uint32_t startSeq_;
uint32_t minSeq_ = 0u;
std::atomic_uint32_t seq_;
public:
BackfillScheduler(BackfillScheduler const& other)
: startSeq_(other.startSeq_), minSeq_(other.minSeq_), seq_(other.seq_.load())
{
}
BackfillScheduler(uint32_t startSeq, std::optional<uint32_t> minSeq = std::nullopt)
: startSeq_(startSeq), minSeq_(minSeq.value_or(0)), seq_(startSeq)
{
}
[[nodiscard]] std::optional<model::Task>
next() override
{
uint32_t currentSeq = seq_;
while (currentSeq > minSeq_) {
if (seq_.compare_exchange_weak(currentSeq, currentSeq - 1u, std::memory_order_acq_rel)) {
return {{.priority = model::Task::Priority::Lower, .seq = currentSeq}};
}
}
return std::nullopt;
}
};
template <SomeScheduler... Schedulers>
class SchedulerChain : public SchedulerInterface {
std::tuple<Schedulers...> schedulers_;
public:
template <SomeScheduler... Ts>
requires(std::is_same_v<Ts, Schedulers> and ...)
SchedulerChain(Ts&&... schedulers) : schedulers_(std::forward<Ts>(schedulers)...)
{
}
[[nodiscard]] std::optional<model::Task>
next() override
{
std::optional<model::Task> task;
auto const expand = [&](auto& s) {
if (task.has_value())
return false;
task = s.next();
return task.has_value();
};
std::apply([&expand](auto&&... xs) { (... || expand(xs)); }, schedulers_);
return task;
}
};
static auto
makeScheduler(SomeScheduler auto&&... schedulers)
{
return std::make_unique<SchedulerChain<std::decay_t<decltype(schedulers)>...>>(
std::forward<decltype(schedulers)>(schedulers)...
);
}
} // namespace etlng::impl

View File

@@ -77,7 +77,7 @@ public:
* @return A shared pointer to a new instance of SubscriptionManager
*/
static std::shared_ptr<SubscriptionManager>
make_SubscriptionManager(
makeSubscriptionManager(
util::config::ClioConfigDefinition const& config,
std::shared_ptr<data::BackendInterface const> const& backend
)

View File

@@ -19,12 +19,12 @@
#include "app/CliArgs.hpp"
#include "app/ClioApplication.hpp"
#include "app/VerifyConfig.hpp"
#include "migration/MigrationApplication.hpp"
#include "rpc/common/impl/HandlerProvider.hpp"
#include "util/TerminationHandler.hpp"
#include "util/log/Logger.hpp"
#include "util/newconfig/ConfigDefinition.hpp"
#include "util/newconfig/ConfigFileJson.hpp"
#include <cstdlib>
#include <exception>
@@ -40,36 +40,27 @@ try {
auto const action = app::CliArgs::parse(argc, argv);
return action.apply(
[](app::CliArgs::Action::Exit const& exit) { return exit.exitCode; },
[](app::CliArgs::Action::VerifyConfig const& verify) {
if (app::verifyConfig(verify.configPath)) {
std::cout << "Config " << verify.configPath << " is correct" << "\n";
return EXIT_SUCCESS;
}
return EXIT_FAILURE;
},
[](app::CliArgs::Action::Run const& run) {
auto const json = ConfigFileJson::make_ConfigFileJson(run.configPath);
if (!json.has_value()) {
std::cerr << json.error().error << std::endl;
if (app::verifyConfig(verify.configPath))
return EXIT_FAILURE;
}
auto const errors = ClioConfig.parse(json.value());
if (errors.has_value()) {
for (auto const& err : errors.value())
std::cerr << err.error << std::endl;
return EXIT_FAILURE;
}
util::LogService::init(ClioConfig);
app::ClioApplication clio{ClioConfig};
util::LogService::init(gClioConfig);
app::ClioApplication clio{gClioConfig};
return clio.run(run.useNgWebServer);
},
[](app::CliArgs::Action::Migrate const& migrate) {
auto const json = ConfigFileJson::make_ConfigFileJson(migrate.configPath);
if (!json.has_value()) {
std::cerr << json.error().error << std::endl;
if (app::verifyConfig(verify.configPath))
return EXIT_FAILURE;
}
auto const errors = ClioConfig.parse(json.value());
if (errors.has_value()) {
for (auto const& err : errors.value())
std::cerr << err.error << std::endl;
return EXIT_FAILURE;
}
util::LogService::init(ClioConfig);
app::MigratorApplication migrator{ClioConfig, migrate.subCmd};
util::LogService::init(gClioConfig);
app::MigratorApplication migrator{gClioConfig, migrate.subCmd};
return migrator.run();
}
);

View File

@@ -79,7 +79,7 @@ public:
fromString(std::string const& statusStr);
private:
static constexpr std::array<char const*, static_cast<size_t>(NumStatuses)> statusStrMap = {
static constexpr std::array<char const*, static_cast<size_t>(NumStatuses)> kSTATUS_STR_MAP = {
"Migrated",
"NotMigrated",
"NotKnown"

View File

@@ -39,14 +39,14 @@ MigratorStatus::operator==(Status const& other) const
std::string
MigratorStatus::toString() const
{
return statusStrMap[static_cast<size_t>(status_)];
return kSTATUS_STR_MAP[static_cast<size_t>(status_)];
}
MigratorStatus
MigratorStatus::fromString(std::string const& statusStr)
{
for (std::size_t i = 0; i < statusStrMap.size(); ++i) {
if (statusStr == statusStrMap[i]) {
for (std::size_t i = 0; i < kSTATUS_STR_MAP.size(); ++i) {
if (statusStr == kSTATUS_STR_MAP[i]) {
return MigratorStatus(static_cast<Status>(i));
}
}

View File

@@ -76,23 +76,23 @@ public:
)
{
LOG(log_.debug()) << "Travsering token range: " << start << " - " << end
<< " ; table: " << TableDesc::TABLE_NAME;
<< " ; table: " << TableDesc::kTABLE_NAME;
// for each table we only have one prepared statement
static auto statementPrepared =
migrationSchema_.getPreparedFullScanStatement(handle_, TableDesc::TABLE_NAME, TableDesc::PARTITION_KEY);
static auto kSTATEMENT_PREPARED =
migrationSchema_.getPreparedFullScanStatement(handle_, TableDesc::kTABLE_NAME, TableDesc::kPARTITION_KEY);
auto const statement = statementPrepared.bind(start, end);
auto const statement = kSTATEMENT_PREPARED.bind(start, end);
auto const res = this->executor_.read(yield, statement);
if (not res) {
LOG(log_.error()) << "Could not fetch data from table: " << TableDesc::TABLE_NAME << " range: " << start
LOG(log_.error()) << "Could not fetch data from table: " << TableDesc::kTABLE_NAME << " range: " << start
<< " - " << end << ";" << res.error();
return;
}
auto const& results = res.value();
if (not results.hasRows()) {
LOG(log_.debug()) << "No rows returned - table: " << TableDesc::TABLE_NAME << " range: " << start << " - "
LOG(log_.debug()) << "No rows returned - table: " << TableDesc::kTABLE_NAME << " range: " << start << " - "
<< end;
return;
}

View File

@@ -84,7 +84,7 @@ public:
data::cassandra::PreparedStatement const&
getPreparedInsertMigratedMigrator(data::cassandra::Handle const& handler)
{
static auto prepared = handler.prepare(fmt::format(
static auto kPREPARED = handler.prepare(fmt::format(
R"(
INSERT INTO {}
(migrator_name, status)
@@ -92,7 +92,7 @@ public:
)",
data::cassandra::qualifiedTableName<SettingsProviderType>(settingsProvider_.get(), "migrator_status")
));
return prepared;
return kPREPARED;
}
};
} // namespace migration::cassandra::impl

View File

@@ -74,9 +74,9 @@ class FullTableScanner {
* @brief The helper to generate the token ranges.
*/
struct TokenRangesProvider {
uint32_t numRanges_;
uint32_t numRanges;
TokenRangesProvider(uint32_t numRanges) : numRanges_{numRanges}
TokenRangesProvider(uint32_t numRanges) : numRanges{numRanges}
{
}
@@ -85,18 +85,18 @@ class FullTableScanner {
{
auto const minValue = std::numeric_limits<std::int64_t>::min();
auto const maxValue = std::numeric_limits<std::int64_t>::max();
if (numRanges_ == 1)
if (numRanges == 1)
return {TokenRange{minValue, maxValue}};
// Safely calculate the range size using uint64_t to avoid overflow
uint64_t const rangeSize = (static_cast<uint64_t>(maxValue) * 2) / numRanges_;
uint64_t const rangeSize = (static_cast<uint64_t>(maxValue) * 2) / numRanges;
std::vector<TokenRange> ranges;
ranges.reserve(numRanges_);
ranges.reserve(numRanges);
for (std::int64_t i = 0; i < numRanges_; ++i) {
for (std::int64_t i = 0; i < numRanges; ++i) {
int64_t const start = minValue + (i * rangeSize);
int64_t const end = (i == numRanges_ - 1) ? maxValue : start + static_cast<int64_t>(rangeSize) - 1;
int64_t const end = (i == numRanges - 1) ? maxValue : start + static_cast<int64_t>(rangeSize) - 1;
ranges.emplace_back(start, end);
}

View File

@@ -42,8 +42,8 @@ namespace migration::cassandra::impl {
*/
struct TableObjectsDesc {
using Row = std::tuple<ripple::uint256, std::uint32_t, data::Blob>;
static constexpr char const* PARTITION_KEY = "key";
static constexpr char const* TABLE_NAME = "objects";
static constexpr char const* kPARTITION_KEY = "key";
static constexpr char const* kTABLE_NAME = "objects";
};
/**

View File

@@ -35,7 +35,7 @@ concept TableSpec = requires {
requires std::tuple_size<typename T::Row>::value >= 0; // Ensures 'row' is a tuple
// Check that static constexpr members 'partitionKey' and 'tableName' exist
{ T::PARTITION_KEY } -> std::convertible_to<char const*>;
{ T::TABLE_NAME } -> std::convertible_to<char const*>;
{ T::kPARTITION_KEY } -> std::convertible_to<char const*>;
{ T::kTABLE_NAME } -> std::convertible_to<char const*>;
};
} // namespace migration::cassandra::impl

View File

@@ -42,8 +42,8 @@ namespace migration::cassandra::impl {
struct TableTransactionsDesc {
// hash, date, ledger_seq, metadata, transaction
using Row = std::tuple<ripple::uint256, std::uint64_t, std::uint32_t, ripple::Blob, ripple::Blob>;
static constexpr char const* PARTITION_KEY = "hash";
static constexpr char const* TABLE_NAME = "transactions";
static constexpr char const* kPARTITION_KEY = "hash";
static constexpr char const* kTABLE_NAME = "transactions";
};
/**

View File

@@ -37,7 +37,7 @@ namespace migration::impl {
std::expected<std::shared_ptr<MigrationManagerInterface>, std::string>
makeMigrationManager(util::config::ClioConfigDefinition const& config)
{
static util::Logger const log{"Migration"};
static util::Logger const log{"Migration"}; // NOLINT(readability-identifier-naming)
LOG(log.info()) << "Constructing MigrationManager";
auto const type = config.get<std::string>("database.type");

View File

@@ -66,7 +66,7 @@ class MigratorsRegister {
void
callMigration(std::string const& name, util::config::ObjectView const& config)
{
if (name == Migrator::name) {
if (name == Migrator::kNAME) {
LOG(log_.info()) << "Running migration: " << name;
Migrator::runMigration(backend_, config);
backend_->writeMigratorStatus(name, MigratorStatus(MigratorStatus::Migrated).toString());
@@ -78,7 +78,7 @@ class MigratorsRegister {
static constexpr std::string_view
getDescriptionIfMatch(std::string_view targetName)
{
return (T::name == targetName) ? T::description : "";
return (T::kNAME == targetName) ? T::kDESCRIPTION : "";
}
public:
@@ -156,7 +156,7 @@ public:
constexpr auto
getMigratorNames() const
{
return std::array<std::string_view, sizeof...(MigratorType)>{MigratorType::name...};
return std::array<std::string_view, sizeof...(MigratorType)>{MigratorType::kNAME...};
}
/**

View File

@@ -34,11 +34,11 @@ namespace migration::impl {
*/
template <typename T, typename Backend>
concept MigratorSpec = requires(std::shared_ptr<Backend> const& backend, util::config::ObjectView const& cfg) {
// Check that 'name' exists and is a string
{ T::name } -> std::convertible_to<std::string>;
// Check that 'kNAME' exists and is a string
{ T::kNAME } -> std::convertible_to<std::string>;
// Check that 'description' exists and is a string
{ T::description } -> std::convertible_to<std::string>;
// Check that 'kDESCRIPTION' exists and is a string
{ T::kDESCRIPTION } -> std::convertible_to<std::string>;
// Check that the migrator specifies the backend type it supports
typename T::Backend;

View File

@@ -84,7 +84,7 @@ public:
* @return The new instance
*/
static Counters
make_Counters(WorkQueue const& wq)
makeCounters(WorkQueue const& wq)
{
return Counters{wq};
}

View File

@@ -42,20 +42,20 @@ namespace rpc {
WarningInfo const&
getWarningInfo(WarningCode code)
{
constexpr static WarningInfo infos[]{
{warnUNKNOWN, "Unknown warning"},
{warnRPC_CLIO,
static constexpr WarningInfo kINFOS[]{
{WarnUnknown, "Unknown warning"},
{WarnRpcClio,
"This is a clio server. clio only serves validated data. If you want to talk to rippled, include "
"'ledger_index':'current' in your request"},
{warnRPC_OUTDATED, "This server may be out of date"},
{warnRPC_RATE_LIMIT, "You are about to be rate limited"},
{warnRPC_DEPRECATED,
{WarnRpcOutdated, "This server may be out of date"},
{WarnRpcRateLimit, "You are about to be rate limited"},
{WarnRpcDeprecated,
"Some fields from your request are deprecated. Please check the documentation at "
"https://xrpl.org/docs/references/http-websocket-apis/ and update your request."}
};
auto matchByCode = [code](auto const& info) { return info.code == code; };
if (auto it = std::ranges::find_if(infos, matchByCode); it != end(infos))
if (auto it = ranges::find_if(kINFOS, matchByCode); it != end(kINFOS))
return *it;
throw(out_of_range("Invalid WarningCode"));
@@ -74,44 +74,43 @@ makeWarning(WarningCode code)
ClioErrorInfo const&
getErrorInfo(ClioError code)
{
constexpr static ClioErrorInfo infos[]{
{.code = ClioError::rpcMALFORMED_CURRENCY, .error = "malformedCurrency", .message = "Malformed currency."},
{.code = ClioError::rpcMALFORMED_REQUEST, .error = "malformedRequest", .message = "Malformed request."},
{.code = ClioError::rpcMALFORMED_OWNER, .error = "malformedOwner", .message = "Malformed owner."},
{.code = ClioError::rpcMALFORMED_ADDRESS, .error = "malformedAddress", .message = "Malformed address."},
{.code = ClioError::rpcINVALID_HOT_WALLET, .error = "invalidHotWallet", .message = "Invalid hot wallet."},
{.code = ClioError::rpcUNKNOWN_OPTION, .error = "unknownOption", .message = "Unknown option."},
{.code = ClioError::rpcFIELD_NOT_FOUND_TRANSACTION,
constexpr static ClioErrorInfo kINFOS[]{
{.code = ClioError::RpcMalformedCurrency, .error = "malformedCurrency", .message = "Malformed currency."},
{.code = ClioError::RpcMalformedRequest, .error = "malformedRequest", .message = "Malformed request."},
{.code = ClioError::RpcMalformedOwner, .error = "malformedOwner", .message = "Malformed owner."},
{.code = ClioError::RpcMalformedAddress, .error = "malformedAddress", .message = "Malformed address."},
{.code = ClioError::RpcInvalidHotWallet, .error = "invalidHotWallet", .message = "Invalid hot wallet."},
{.code = ClioError::RpcUnknownOption, .error = "unknownOption", .message = "Unknown option."},
{.code = ClioError::RpcFieldNotFoundTransaction,
.error = "fieldNotFoundTransaction",
.message = "Missing field."},
{.code = ClioError::rpcMALFORMED_ORACLE_DOCUMENT_ID,
{.code = ClioError::RpcMalformedOracleDocumentId,
.error = "malformedDocumentID",
.message = "Malformed oracle_document_id."},
{.code = ClioError::rpcMALFORMED_AUTHORIZED_CREDENTIALS,
{.code = ClioError::RpcMalformedAuthorizedCredentials,
.error = "malformedAuthorizedCredentials",
.message = "Malformed authorized credentials."},
// special system errors
{.code = ClioError::rpcINVALID_API_VERSION, .error = JS(invalid_API_version), .message = "Invalid API version."
},
{.code = ClioError::rpcCOMMAND_IS_MISSING,
{.code = ClioError::RpcInvalidApiVersion, .error = JS(invalid_API_version), .message = "Invalid API version."},
{.code = ClioError::RpcCommandIsMissing,
.error = JS(missingCommand),
.message = "Method is not specified or is not a string."},
{.code = ClioError::rpcCOMMAND_NOT_STRING, .error = "commandNotString", .message = "Method is not a string."},
{.code = ClioError::rpcCOMMAND_IS_EMPTY, .error = "emptyCommand", .message = "Method is an empty string."},
{.code = ClioError::rpcPARAMS_UNPARSEABLE,
{.code = ClioError::RpcCommandNotString, .error = "commandNotString", .message = "Method is not a string."},
{.code = ClioError::RpcCommandIsEmpty, .error = "emptyCommand", .message = "Method is an empty string."},
{.code = ClioError::RpcParamsUnparseable,
.error = "paramsUnparseable",
.message = "Params must be an array holding exactly one object."},
// etl related errors
{.code = ClioError::etlCONNECTION_ERROR, .error = "connectionError", .message = "Couldn't connect to rippled."},
{.code = ClioError::etlREQUEST_ERROR, .error = "requestError", .message = "Error sending request to rippled."},
{.code = ClioError::etlREQUEST_TIMEOUT, .error = "timeout", .message = "Request to rippled timed out."},
{.code = ClioError::etlINVALID_RESPONSE,
{.code = ClioError::EtlConnectionError, .error = "connectionError", .message = "Couldn't connect to rippled."},
{.code = ClioError::EtlRequestError, .error = "requestError", .message = "Error sending request to rippled."},
{.code = ClioError::EtlRequestTimeout, .error = "timeout", .message = "Request to rippled timed out."},
{.code = ClioError::EtlInvalidResponse,
.error = "invalidResponse",
.message = "Rippled returned an invalid response."}
};
auto matchByCode = [code](auto const& info) { return info.code == code; };
if (auto it = std::ranges::find_if(infos, matchByCode); it != end(infos))
if (auto it = ranges::find_if(kINFOS, matchByCode); it != end(kINFOS))
return *it;
throw(out_of_range("Invalid error code"));

View File

@@ -35,30 +35,30 @@ namespace rpc {
/** @brief Custom clio RPC Errors. */
enum class ClioError {
// normal clio errors start with 5000
rpcMALFORMED_CURRENCY = 5000,
rpcMALFORMED_REQUEST = 5001,
rpcMALFORMED_OWNER = 5002,
rpcMALFORMED_ADDRESS = 5003,
rpcINVALID_HOT_WALLET = 5004,
rpcUNKNOWN_OPTION = 5005,
rpcFIELD_NOT_FOUND_TRANSACTION = 5006,
rpcMALFORMED_ORACLE_DOCUMENT_ID = 5007,
rpcMALFORMED_AUTHORIZED_CREDENTIALS = 5008,
RpcMalformedCurrency = 5000,
RpcMalformedRequest = 5001,
RpcMalformedOwner = 5002,
RpcMalformedAddress = 5003,
RpcInvalidHotWallet = 5004,
RpcUnknownOption = 5005,
RpcFieldNotFoundTransaction = 5006,
RpcMalformedOracleDocumentId = 5007,
RpcMalformedAuthorizedCredentials = 5008,
// special system errors start with 6000
rpcINVALID_API_VERSION = 6000,
rpcCOMMAND_IS_MISSING = 6001,
rpcCOMMAND_NOT_STRING = 6002,
rpcCOMMAND_IS_EMPTY = 6003,
rpcPARAMS_UNPARSEABLE = 6004,
RpcInvalidApiVersion = 6000,
RpcCommandIsMissing = 6001,
RpcCommandNotString = 6002,
RpcCommandIsEmpty = 6003,
RpcParamsUnparseable = 6004,
// TODO: Since it is not only rpc errors here now, we should move it to util
// etl related errors start with 7000
// Higher value in this errors means better progress in the forwarding
etlCONNECTION_ERROR = 7000,
etlREQUEST_ERROR = 7001,
etlREQUEST_TIMEOUT = 7002,
etlINVALID_RESPONSE = 7003,
EtlConnectionError = 7000,
EtlRequestError = 7001,
EtlRequestTimeout = 7002,
EtlInvalidResponse = 7003,
};
/** @brief Holds info about a particular @ref ClioError. */
@@ -186,11 +186,11 @@ struct Status {
/** @brief Warning codes that can be returned by clio. */
enum WarningCode {
warnUNKNOWN = -1,
warnRPC_CLIO = 2001,
warnRPC_OUTDATED = 2002,
warnRPC_RATE_LIMIT = 2003,
warnRPC_DEPRECATED = 2004
WarnUnknown = -1,
WarnRpcClio = 2001,
WarnRpcOutdated = 2002,
WarnRpcRateLimit = 2003,
WarnRpcDeprecated = 2004
};
/** @brief Holds information about a clio warning. */
@@ -207,13 +207,13 @@ struct WarningInfo {
{
}
WarningCode code = warnUNKNOWN;
WarningCode code = WarnUnknown;
std::string_view const message = "unknown warning";
};
/** @brief Invalid parameters error. */
class InvalidParamsError : public std::exception {
std::string msg;
std::string msg_;
public:
/**
@@ -221,7 +221,7 @@ public:
*
* @param msg The error message
*/
explicit InvalidParamsError(std::string msg) : msg(std::move(msg))
explicit InvalidParamsError(std::string msg) : msg_(std::move(msg))
{
}
@@ -233,13 +233,13 @@ public:
char const*
what() const throw() override
{
return msg.c_str();
return msg_.c_str();
}
};
/** @brief Account not found error. */
class AccountNotFoundError : public std::exception {
std::string account;
std::string account_;
public:
/**
@@ -247,7 +247,7 @@ public:
*
* @param acct The account
*/
explicit AccountNotFoundError(std::string acct) : account(std::move(acct))
explicit AccountNotFoundError(std::string acct) : account_(std::move(acct))
{
}
@@ -259,12 +259,12 @@ public:
char const*
what() const throw() override
{
return account.c_str();
return account_.c_str();
}
};
/** @brief A globally available @ref rpc::Status that represents a successful state. */
static Status OK;
static Status gOk;
/**
* @brief Get the warning info object from a warning code.

View File

@@ -44,7 +44,7 @@ using namespace util;
namespace rpc {
std::expected<web::Context, Status>
make_WsContext(
makeWsContext(
boost::asio::yield_context yc,
boost::json::object const& request,
web::SubscriptionContextPtr session,
@@ -63,18 +63,18 @@ make_WsContext(
}
if (!commandValue.is_string())
return Error{{ClioError::rpcCOMMAND_IS_MISSING, "Method/Command is not specified or is not a string."}};
return Error{{ClioError::RpcCommandIsMissing, "Method/Command is not specified or is not a string."}};
auto const apiVersion = apiVersionParser.get().parse(request);
if (!apiVersion)
return Error{{ClioError::rpcINVALID_API_VERSION, apiVersion.error()}};
return Error{{ClioError::RpcInvalidApiVersion, apiVersion.error()}};
auto const command = boost::json::value_to<std::string>(commandValue);
return web::Context(yc, command, *apiVersion, request, std::move(session), tagFactory, range, clientIp, isAdmin);
}
std::expected<web::Context, Status>
make_HttpContext(
makeHttpContext(
boost::asio::yield_context yc,
boost::json::object const& request,
util::TagDecoratorFactory const& tagFactory,
@@ -85,13 +85,13 @@ make_HttpContext(
)
{
if (!request.contains("method"))
return Error{{ClioError::rpcCOMMAND_IS_MISSING}};
return Error{{ClioError::RpcCommandIsMissing}};
if (!request.at("method").is_string())
return Error{{ClioError::rpcCOMMAND_NOT_STRING}};
return Error{{ClioError::RpcCommandNotString}};
if (request.at("method").as_string().empty())
return Error{{ClioError::rpcCOMMAND_IS_EMPTY}};
return Error{{ClioError::RpcCommandIsEmpty}};
auto const command = boost::json::value_to<std::string>(request.at("method"));
@@ -99,16 +99,16 @@ make_HttpContext(
return Error{{RippledError::rpcBAD_SYNTAX, "Subscribe and unsubscribe are only allowed for websocket."}};
if (!request.at("params").is_array())
return Error{{ClioError::rpcPARAMS_UNPARSEABLE, "Missing params array."}};
return Error{{ClioError::RpcParamsUnparseable, "Missing params array."}};
boost::json::array const& array = request.at("params").as_array();
if (array.size() != 1 || !array.at(0).is_object())
return Error{{ClioError::rpcPARAMS_UNPARSEABLE}};
return Error{{ClioError::RpcParamsUnparseable}};
auto const apiVersion = apiVersionParser.get().parse(request.at("params").as_array().at(0).as_object());
if (!apiVersion)
return Error{{ClioError::rpcINVALID_API_VERSION, apiVersion.error()}};
return Error{{ClioError::RpcInvalidApiVersion, apiVersion.error()}};
return web::Context(
yc, command, *apiVersion, array.at(0).as_object(), nullptr, tagFactory, range, clientIp, isAdmin

View File

@@ -57,7 +57,7 @@ namespace rpc {
* @return A Websocket context or error Status
*/
std::expected<web::Context, Status>
make_WsContext(
makeWsContext(
boost::asio::yield_context yc,
boost::json::object const& request,
web::SubscriptionContextPtr session,
@@ -81,7 +81,7 @@ make_WsContext(
* @return A HTTP context or error Status
*/
std::expected<web::Context, Status>
make_HttpContext(
makeHttpContext(
boost::asio::yield_context yc,
boost::json::object const& request,
util::TagDecoratorFactory const& tagFactory,

View File

@@ -125,7 +125,7 @@ public:
* @return A new instance of the RPC engine
*/
static std::shared_ptr<RPCEngine>
make_RPCEngine(
makeRPCEngine(
util::config::ClioConfigDefinition const& config,
std::shared_ptr<BackendInterface> const& backend,
std::shared_ptr<LoadBalancerType> const& balancer,

View File

@@ -156,9 +156,9 @@ getDeliveredAmount(
// then its absence indicates that the amount delivered is listed in the
// Amount field. DeliveredAmount went live January 24, 2014.
// 446000000 is in Feb 2014, well after DeliveredAmount went live
static std::uint32_t constexpr FIRST_LEDGER_WITH_DELIVERED_AMOUNT = 4594095;
static std::uint32_t constexpr DELIVERED_AMOUNT_LIVE_DATE = 446000000;
if (ledgerSequence >= FIRST_LEDGER_WITH_DELIVERED_AMOUNT || date > DELIVERED_AMOUNT_LIVE_DATE) {
static constexpr std::uint32_t kFIRST_LEDGER_WITH_DELIVERED_AMOUNT = 4594095;
static constexpr std::uint32_t kDELIVERED_AMOUNT_LIVE_DATE = 446000000;
if (ledgerSequence >= kFIRST_LEDGER_WITH_DELIVERED_AMOUNT || date > kDELIVERED_AMOUNT_LIVE_DATE) {
return txn->getFieldAmount(ripple::sfAmount);
}
}
@@ -286,13 +286,13 @@ toExpandedJson(
std::optional<std::string>
encodeCTID(uint32_t ledgerSeq, uint16_t txnIndex, uint16_t networkId) noexcept
{
static uint32_t constexpr MAX_LEDGER_SEQ = 0x0FFF'FFFF;
if (ledgerSeq > MAX_LEDGER_SEQ)
static constexpr uint32_t kMAX_LEDGER_SEQ = 0x0FFF'FFFF;
if (ledgerSeq > kMAX_LEDGER_SEQ)
return {};
static uint64_t constexpr CTID_PREFIX = 0xC000'0000;
static constexpr uint64_t kCTID_PREFIX = 0xC000'0000;
uint64_t const ctidValue =
((CTID_PREFIX + static_cast<uint64_t>(ledgerSeq)) << 32) + (static_cast<uint64_t>(txnIndex) << 16) + networkId;
((kCTID_PREFIX + static_cast<uint64_t>(ledgerSeq)) << 32) + (static_cast<uint64_t>(txnIndex) << 16) + networkId;
return {fmt::format("{:016X}", ctidValue)};
}
@@ -711,8 +711,8 @@ traverseOwnedNodes(
// Only reserve 2048 nodes when fetching all owned ledger objects. If there
// are more, then keys will allocate more memory, which is suboptimal, but
// should only occur occasionally.
static std::uint32_t constexpr MIN_NODES = 2048;
keys.reserve(std::min(MIN_NODES, limit));
static constexpr std::uint32_t kMIN_NODES = 2048;
keys.reserve(std::min(kMIN_NODES, limit));
auto start = std::chrono::system_clock::now();
@@ -848,10 +848,10 @@ parseRippleLibSeed(boost::json::value const& value)
auto const result = ripple::decodeBase58Token(boost::json::value_to<std::string>(value), ripple::TokenType::None);
static std::size_t constexpr SEED_SIZE = 18;
static std::array<std::uint8_t, 2> constexpr SEED_PREFIX = {0xE1, 0x4B};
if (result.size() == SEED_SIZE && static_cast<std::uint8_t>(result[0]) == SEED_PREFIX[0] &&
static_cast<std::uint8_t>(result[1]) == SEED_PREFIX[1])
static constexpr std::size_t kSEED_SIZE = 18;
static constexpr std::array<std::uint8_t, 2> kSEED_PREFIX = {0xE1, 0x4B};
if (result.size() == kSEED_SIZE && static_cast<std::uint8_t>(result[0]) == kSEED_PREFIX[0] &&
static_cast<std::uint8_t>(result[1]) == kSEED_PREFIX[1])
return ripple::Seed(ripple::makeSlice(result.substr(2)));
return {};
@@ -1212,94 +1212,94 @@ parseBook(boost::json::object const& request)
if (!request.at("taker_gets").is_object())
return Status{RippledError::rpcINVALID_PARAMS, "Field 'taker_gets' is not an object"};
auto taker_pays = request.at("taker_pays").as_object();
if (!taker_pays.contains("currency"))
auto takerPays = request.at("taker_pays").as_object();
if (!takerPays.contains("currency"))
return Status{RippledError::rpcSRC_CUR_MALFORMED};
if (!taker_pays.at("currency").is_string())
if (!takerPays.at("currency").is_string())
return Status{RippledError::rpcSRC_CUR_MALFORMED};
auto taker_gets = request.at("taker_gets").as_object();
if (!taker_gets.contains("currency"))
auto takerGets = request.at("taker_gets").as_object();
if (!takerGets.contains("currency"))
return Status{RippledError::rpcDST_AMT_MALFORMED};
if (!taker_gets.at("currency").is_string()) {
if (!takerGets.at("currency").is_string()) {
return Status{
RippledError::rpcDST_AMT_MALFORMED,
};
}
ripple::Currency pay_currency;
if (!ripple::to_currency(pay_currency, boost::json::value_to<std::string>(taker_pays.at("currency"))))
ripple::Currency payCurrency;
if (!ripple::to_currency(payCurrency, boost::json::value_to<std::string>(takerPays.at("currency"))))
return Status{RippledError::rpcSRC_CUR_MALFORMED};
ripple::Currency get_currency;
if (!ripple::to_currency(get_currency, boost::json::value_to<std::string>(taker_gets["currency"])))
ripple::Currency getCurrency;
if (!ripple::to_currency(getCurrency, boost::json::value_to<std::string>(takerGets["currency"])))
return Status{RippledError::rpcDST_AMT_MALFORMED};
ripple::AccountID pay_issuer;
if (taker_pays.contains("issuer")) {
if (!taker_pays.at("issuer").is_string())
ripple::AccountID payIssuer;
if (takerPays.contains("issuer")) {
if (!takerPays.at("issuer").is_string())
return Status{RippledError::rpcINVALID_PARAMS, "takerPaysIssuerNotString"};
if (!ripple::to_issuer(pay_issuer, boost::json::value_to<std::string>(taker_pays.at("issuer"))))
if (!ripple::to_issuer(payIssuer, boost::json::value_to<std::string>(takerPays.at("issuer"))))
return Status{RippledError::rpcSRC_ISR_MALFORMED};
if (pay_issuer == ripple::noAccount())
if (payIssuer == ripple::noAccount())
return Status{RippledError::rpcSRC_ISR_MALFORMED};
} else {
pay_issuer = ripple::xrpAccount();
payIssuer = ripple::xrpAccount();
}
if (isXRP(pay_currency) && !isXRP(pay_issuer)) {
if (isXRP(payCurrency) && !isXRP(payIssuer)) {
return Status{
RippledError::rpcSRC_ISR_MALFORMED, "Unneeded field 'taker_pays.issuer' for XRP currency specification."
};
}
if (!isXRP(pay_currency) && isXRP(pay_issuer)) {
if (!isXRP(payCurrency) && isXRP(payIssuer)) {
return Status{
RippledError::rpcSRC_ISR_MALFORMED, "Invalid field 'taker_pays.issuer', expected non-XRP issuer."
};
}
if ((!isXRP(pay_currency)) && (!taker_pays.contains("issuer")))
if ((!isXRP(payCurrency)) && (!takerPays.contains("issuer")))
return Status{RippledError::rpcSRC_ISR_MALFORMED, "Missing non-XRP issuer."};
ripple::AccountID get_issuer;
ripple::AccountID getIssuer;
if (taker_gets.contains("issuer")) {
if (!taker_gets["issuer"].is_string())
if (takerGets.contains("issuer")) {
if (!takerGets["issuer"].is_string())
return Status{RippledError::rpcINVALID_PARAMS, "taker_gets.issuer should be string"};
if (!ripple::to_issuer(get_issuer, boost::json::value_to<std::string>(taker_gets.at("issuer"))))
if (!ripple::to_issuer(getIssuer, boost::json::value_to<std::string>(takerGets.at("issuer"))))
return Status{RippledError::rpcDST_ISR_MALFORMED, "Invalid field 'taker_gets.issuer', bad issuer."};
if (get_issuer == ripple::noAccount()) {
if (getIssuer == ripple::noAccount()) {
return Status{
RippledError::rpcDST_ISR_MALFORMED, "Invalid field 'taker_gets.issuer', bad issuer account one."
};
}
} else {
get_issuer = ripple::xrpAccount();
getIssuer = ripple::xrpAccount();
}
if (ripple::isXRP(get_currency) && !ripple::isXRP(get_issuer)) {
if (ripple::isXRP(getCurrency) && !ripple::isXRP(getIssuer)) {
return Status{
RippledError::rpcDST_ISR_MALFORMED, "Unneeded field 'taker_gets.issuer' for XRP currency specification."
};
}
if (!ripple::isXRP(get_currency) && ripple::isXRP(get_issuer)) {
if (!ripple::isXRP(getCurrency) && ripple::isXRP(getIssuer)) {
return Status{
RippledError::rpcDST_ISR_MALFORMED, "Invalid field 'taker_gets.issuer', expected non-XRP issuer."
};
}
if (pay_currency == get_currency && pay_issuer == get_issuer)
if (payCurrency == getCurrency && payIssuer == getIssuer)
return Status{RippledError::rpcBAD_MARKET, "badMarket"};
return ripple::Book{{pay_currency, pay_issuer}, {get_currency, get_issuer}};
return ripple::Book{{payCurrency, payIssuer}, {getCurrency, getIssuer}};
}
std::variant<Status, ripple::AccountID>

View File

@@ -616,8 +616,8 @@ decodeCTID(T const ctid) noexcept
auto const getCTID64 = [](T const ctid) noexcept -> std::optional<uint64_t> {
if constexpr (std::is_convertible_v<T, std::string>) {
std::string const ctidString(ctid);
static std::size_t constexpr CTID_STRING_LENGTH = 16;
if (ctidString.length() != CTID_STRING_LENGTH)
static constexpr std::size_t kCTID_STRING_LENGTH = 16;
if (ctidString.length() != kCTID_STRING_LENGTH)
return {};
if (!boost::regex_match(ctidString, boost::regex("^[0-9A-F]+$")))
@@ -634,10 +634,10 @@ decodeCTID(T const ctid) noexcept
auto const ctidValue = getCTID64(ctid).value_or(0);
static uint64_t constexpr CTID_PREFIX = 0xC000'0000'0000'0000ULL;
static uint64_t constexpr CTID_PREFIX_MASK = 0xF000'0000'0000'0000ULL;
static constexpr uint64_t kCTID_PREFIX = 0xC000'0000'0000'0000ULL;
static constexpr uint64_t kCTID_PREFIX_MASK = 0xF000'0000'0000'0000ULL;
if ((ctidValue & CTID_PREFIX_MASK) != CTID_PREFIX)
if ((ctidValue & kCTID_PREFIX_MASK) != kCTID_PREFIX)
return {};
uint32_t const ledgerSeq = (ctidValue >> 32) & 0xFFFF'FFFUL;
@@ -659,8 +659,8 @@ logDuration(web::Context const& ctx, T const& dur)
{
using boost::json::serialize;
static util::Logger const log{"RPC"};
static std::int64_t constexpr DURATION_ERROR_THRESHOLD_SECONDS = 10;
static util::Logger const log{"RPC"}; // NOLINT(readability-identifier-naming)
static constexpr std::int64_t kDURATION_ERROR_THRESHOLD_SECONDS = 10;
auto const millis = std::chrono::duration_cast<std::chrono::milliseconds>(dur).count();
auto const seconds = std::chrono::duration_cast<std::chrono::seconds>(dur).count();
@@ -668,7 +668,7 @@ logDuration(web::Context const& ctx, T const& dur)
"Request processing duration = {} milliseconds. request = {}", millis, serialize(util::removeSecret(ctx.params))
);
if (seconds > DURATION_ERROR_THRESHOLD_SECONDS) {
if (seconds > kDURATION_ERROR_THRESHOLD_SECONDS) {
LOG(log.error()) << ctx.tag() << msg;
} else if (seconds > 1) {
LOG(log.warn()) << ctx.tag() << msg;

View File

@@ -90,9 +90,9 @@ WorkQueue::stop(std::function<void()> onQueueEmpty)
}
WorkQueue
WorkQueue::make_WorkQueue(util::config::ClioConfigDefinition const& config)
WorkQueue::makeWorkQueue(util::config::ClioConfigDefinition const& config)
{
static util::Logger const log{"RPC"};
static util::Logger const log{"RPC"}; // NOLINT(readability-identifier-naming)
auto const serverConfig = config.getObject("server");
auto const numThreads = config.get<uint32_t>("workers");
auto const maxQueueSize = serverConfig.get<uint32_t>("max_queue_size");

View File

@@ -96,7 +96,7 @@ public:
* @return The work queue
*/
static WorkQueue
make_WorkQueue(util::config::ClioConfigDefinition const& config);
makeWorkQueue(util::config::ClioConfigDefinition const& config);
/**
* @brief Submit a job to the work queue.

Some files were not shown because too many files have changed in this diff Show More