From 8a3e71e91f7cb8694a5e9fcea2cab830c0986ac4 Mon Sep 17 00:00:00 2001 From: Peter Chen <34582813+PeterChen13579@users.noreply.github.com> Date: Wed, 2 Apr 2025 09:53:35 -0400 Subject: [PATCH 01/98] fix: incorrect set HighDeepFreeze flag (#1978) fixes #1977 --- src/rpc/handlers/AccountLines.cpp | 5 +- tests/unit/rpc/RPCHelpersTests.cpp | 6 +- tests/unit/rpc/handlers/AccountLinesTests.cpp | 83 ++++++++++++++++++- 3 files changed, 88 insertions(+), 6 deletions(-) diff --git a/src/rpc/handlers/AccountLines.cpp b/src/rpc/handlers/AccountLines.cpp index 5644e7daf..6459e27d6 100644 --- a/src/rpc/handlers/AccountLines.cpp +++ b/src/rpc/handlers/AccountLines.cpp @@ -86,8 +86,9 @@ AccountLinesHandler::addLine( bool const lineNoRipplePeer = (flags & (not viewLowest ? ripple::lsfLowNoRipple : ripple::lsfHighNoRipple)) != 0u; bool const lineFreeze = (flags & (viewLowest ? ripple::lsfLowFreeze : ripple::lsfHighFreeze)) != 0u; bool const lineFreezePeer = (flags & (not viewLowest ? ripple::lsfLowFreeze : ripple::lsfHighFreeze)) != 0u; - bool const lineDeepFreeze = (flags & (viewLowest ? ripple::lsfLowDeepFreeze : ripple::lsfHighFreeze)) != 0u; - bool const lineDeepFreezePeer = (flags & (not viewLowest ? ripple::lsfLowDeepFreeze : ripple::lsfHighFreeze)) != 0u; + bool const lineDeepFreeze = (flags & (viewLowest ? ripple::lsfLowDeepFreeze : ripple::lsfHighDeepFreeze)) != 0u; + bool const lineDeepFreezePeer = + (flags & (not viewLowest ? ripple::lsfLowDeepFreeze : ripple::lsfHighDeepFreeze)) != 0u; ripple::STAmount const& saBalance = balance; ripple::STAmount const& saLimit = lineLimit; diff --git a/tests/unit/rpc/RPCHelpersTests.cpp b/tests/unit/rpc/RPCHelpersTests.cpp index cfc0c7e03..0dd500d34 100644 --- a/tests/unit/rpc/RPCHelpersTests.cpp +++ b/tests/unit/rpc/RPCHelpersTests.cpp @@ -581,7 +581,7 @@ TEST_F(RPCHelpersTest, FetchAndCheckAnyFlagExists_BlobDoesNotExist) }); } -TEST_F(RPCHelpersTest, FetchAndCheckAnyFlagExistsBlobExists_AccountWithCorrectFlag) +TEST_F(RPCHelpersTest, FetchAndCheckAnyFlagExists_AccountWithCorrectFlag) { auto const account = getAccountIdWithString(kACCOUNT); auto const issuerKey = ripple::keylet::account(account); @@ -600,12 +600,12 @@ TEST_F(RPCHelpersTest, FetchAndCheckAnyFlagExistsBlobExists_AccountWithCorrectFl }); } -TEST_F(RPCHelpersTest, FetchAndCheckAnyFlagExistsBlobExists_AccountFlagDoesNotExist) +TEST_F(RPCHelpersTest, FetchAndCheckAnyFlagExists_TrustLineIsFrozenAndCheckFreezeFlag) { auto const account = getAccountIdWithString(kACCOUNT); auto const issuerKey = ripple::keylet::account(account); - // create account with highDeepFreeze Flag + // create account with lowDeepFreeze Flag auto const accountObject = createAccountRootObject(kACCOUNT, ripple::lsfLowDeepFreeze, 1, 10, 2, kTXN_ID, 3); ON_CALL(*backend_, doFetchLedgerObject(issuerKey.key, kLEDGER_SEQ_OBJECT, _)) diff --git a/tests/unit/rpc/handlers/AccountLinesTests.cpp b/tests/unit/rpc/handlers/AccountLinesTests.cpp index 2d0b72aa8..1b774fd49 100644 --- a/tests/unit/rpc/handlers/AccountLinesTests.cpp +++ b/tests/unit/rpc/handlers/AccountLinesTests.cpp @@ -19,6 +19,7 @@ #include "data/Types.hpp" #include "rpc/Errors.hpp" +#include "rpc/RPCHelpers.hpp" #include "rpc/common/AnyHandler.hpp" #include "rpc/common/Types.hpp" #include "rpc/handlers/AccountLines.hpp" @@ -716,7 +717,7 @@ TEST_F(RPCAccountLinesHandlerTest, EmptyChannel) }); } -TEST_F(RPCAccountLinesHandlerTest, OptionalResponseField) +TEST_F(RPCAccountLinesHandlerTest, OptionalResponseFieldWithDeepFreeze) { static constexpr auto kCORRECT_OUTPUT = R"({ "account": "rf1BiGeXwwQoi8Z2ueFYTEXSwuJYfV2Jpn", @@ -806,6 +807,86 @@ TEST_F(RPCAccountLinesHandlerTest, OptionalResponseField) }); } +TEST_F(RPCAccountLinesHandlerTest, FrozenTrustLineResponse) +{ + static constexpr auto kCORRECT_OUTPUT = R"({ + "account": "rf1BiGeXwwQoi8Z2ueFYTEXSwuJYfV2Jpn", + "ledger_hash": "4BC50C9B0D8515D3EAAE1E74B29A95804346C491EE1A95BF25E4AAB854A6A652", + "ledger_index": 30, + "validated": true, + "limit": 200, + "lines": [ + { + "account": "rLEsXccBGNR3UPuPu2hUXPjziKC3qKSBun", + "balance": "10", + "currency": "USD", + "limit": "100", + "limit_peer": "200", + "quality_in": 0, + "quality_out": 0, + "peer_authorized": true, + "freeze_peer": true + }, + { + "account": "rLEsXccBGNR3UPuPu2hUXPjziKC3qKSBun", + "balance": "20", + "currency": "USD", + "limit": "200", + "limit_peer": "400", + "quality_in": 0, + "quality_out": 0, + "authorized": true, + "freeze": true + } + ] + })"; + + auto ledgerHeader = createLedgerHeader(kLEDGER_HASH, 30); + EXPECT_CALL(*backend_, fetchLedgerBySequence).WillOnce(Return(ledgerHeader)); + + // fetch account object return something + auto account = getAccountIdWithString(kACCOUNT); + auto accountKk = ripple::keylet::account(account).key; + auto owneDirKk = ripple::keylet::ownerDir(account).key; + auto fake = Blob{'f', 'a', 'k', 'e'}; + + // return a non empty account + ON_CALL(*backend_, doFetchLedgerObject(accountKk, testing::_, testing::_)).WillByDefault(Return(fake)); + + // return owner index + ripple::STObject const ownerDir = + createOwnerDirLedgerObject({ripple::uint256{kINDEX1}, ripple::uint256{kINDEX2}}, kINDEX1); + + ON_CALL(*backend_, doFetchLedgerObject(owneDirKk, testing::_, testing::_)) + .WillByDefault(Return(ownerDir.getSerializer().peekData())); + + // return few trust lines + std::vector bbs; + auto line1 = createRippleStateLedgerObject("USD", kACCOUNT2, 10, kACCOUNT, 100, kACCOUNT2, 200, kTXN_ID, 0); + line1.setFlag(ripple::lsfHighAuth); + line1.setFlag(ripple::lsfHighFreeze); + bbs.push_back(line1.getSerializer().peekData()); + + auto line2 = createRippleStateLedgerObject("USD", kACCOUNT2, 20, kACCOUNT, 200, kACCOUNT2, 400, kTXN_ID, 0); + line2.setFlag(ripple::lsfLowAuth); + line2.setFlag(ripple::lsfLowFreeze); + bbs.push_back(line2.getSerializer().peekData()); + + EXPECT_CALL(*backend_, doFetchLedgerObjects).WillOnce(Return(bbs)); + auto const input = json::parse(fmt::format( + R"({{ + "account": "{}" + }})", + kACCOUNT + )); + runSpawn([&, this](auto yield) { + auto handler = AnyHandler{AccountLinesHandler{this->backend_}}; + auto const output = handler.process(input, Context{yield}); + ASSERT_TRUE(output); + EXPECT_EQ(json::parse(kCORRECT_OUTPUT), *output.result); + }); +} + // normal case : test marker output correct TEST_F(RPCAccountLinesHandlerTest, MarkerOutput) { From bdf7382d44ce51be3921f00de28f8a4860721707 Mon Sep 17 00:00:00 2001 From: Alex Kremer Date: Wed, 2 Apr 2025 16:50:45 +0100 Subject: [PATCH 02/98] chore: Update maintainers (#1987) --- CONTRIBUTING.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 7be9bc78f..5140a2daf 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -27,7 +27,7 @@ The pre-commit hook requires `clang-format >= 19.0.0` and `cmake-format` to be i The hook will also attempt to automatically use `doxygen` to verify that everything public in the codebase is covered by doc comments. If `doxygen` is not installed, the hook will raise a warning suggesting to install `doxygen` for future commits. ## Git commands -This sections offers a detailed look at the git commands you will need to use to get your PR submitted. +This sections offers a detailed look at the git commands you will need to use to get your PR submitted. Please note that there are more than one way to do this and these commands are provided for your convenience. At this point it's assumed that you have already finished working on your feature/bug. @@ -144,12 +144,13 @@ Existing maintainers can resign, or be subject to a vote for removal at the behe ## Existing Maintainers -* [cindyyan317](https://github.com/cindyyan317) (Ripple) * [godexsoft](https://github.com/godexsoft) (Ripple) * [kuznetsss](https://github.com/kuznetsss) (Ripple) * [legleux](https://github.com/legleux) (Ripple) +* [PeterChen13579](https://github.com/PeterChen13579) (Ripple) ## Honorable ex-Maintainers +* [cindyyan317](https://github.com/cindyyan317) (ex-Ripple) * [cjcobb23](https://github.com/cjcobb23) (ex-Ripple) * [natenichols](https://github.com/natenichols) (ex-Ripple) From 91484c64e4faefdb2951c6dbb63515c74bd8fa8a Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 3 Apr 2025 11:08:47 +0100 Subject: [PATCH 03/98] style: clang-tidy auto fixes (#1989) Fixes #1988. Please review and commit clang-tidy fixes. Co-authored-by: godexsoft <385326+godexsoft@users.noreply.github.com> --- tests/unit/rpc/handlers/AccountLinesTests.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/unit/rpc/handlers/AccountLinesTests.cpp b/tests/unit/rpc/handlers/AccountLinesTests.cpp index 1b774fd49..32cb3747a 100644 --- a/tests/unit/rpc/handlers/AccountLinesTests.cpp +++ b/tests/unit/rpc/handlers/AccountLinesTests.cpp @@ -19,7 +19,6 @@ #include "data/Types.hpp" #include "rpc/Errors.hpp" -#include "rpc/RPCHelpers.hpp" #include "rpc/common/AnyHandler.hpp" #include "rpc/common/Types.hpp" #include "rpc/handlers/AccountLines.hpp" From 6896a2545a867e607309647e36838c9c2c441e63 Mon Sep 17 00:00:00 2001 From: Alex Kremer Date: Fri, 4 Apr 2025 15:51:13 +0100 Subject: [PATCH 04/98] chore: Update workflow notification settings (#1990) --- .github/actions/create_issue/action.yml | 4 +--- .github/dependabot.yml | 2 +- .github/workflows/clang-tidy.yml | 2 +- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/.github/actions/create_issue/action.yml b/.github/actions/create_issue/action.yml index 43d125593..84844c8c3 100644 --- a/.github/actions/create_issue/action.yml +++ b/.github/actions/create_issue/action.yml @@ -14,7 +14,7 @@ inputs: assignees: description: Comma-separated list of assignees required: true - default: 'cindyyan317,godexsoft,kuznetsss' + default: 'godexsoft,kuznetsss,PeterChen13579' outputs: created_issue_id: description: Created issue id @@ -31,5 +31,3 @@ runs: created_issue=$(cat create_issue.log | sed 's|.*/||') echo "created_issue=$created_issue" >> $GITHUB_OUTPUT rm create_issue.log issue.md - - diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 090a77fd7..6549e1f78 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -8,9 +8,9 @@ updates: time: "04:00" timezone: "Etc/GMT" reviewers: - - "cindyyan317" - "godexsoft" - "kuznetsss" + - "PeterChen13579" commit-message: prefix: "[CI] " target-branch: "develop" diff --git a/.github/workflows/clang-tidy.yml b/.github/workflows/clang-tidy.yml index e251aff1d..83f5251c8 100644 --- a/.github/workflows/clang-tidy.yml +++ b/.github/workflows/clang-tidy.yml @@ -111,7 +111,7 @@ jobs: delete-branch: true title: "style: clang-tidy auto fixes" body: "Fixes #${{ steps.create_issue.outputs.created_issue_id }}. Please review and commit clang-tidy fixes." - reviewers: "cindyyan317,godexsoft,kuznetsss" + reviewers: "godexsoft,kuznetsss,PeterChen13579" - name: Fail the job if: ${{ steps.run_clang_tidy.outcome != 'success' }} From 1d011cf8d948e9ee370b9da62392d47856005901 Mon Sep 17 00:00:00 2001 From: Alex Kremer Date: Fri, 4 Apr 2025 15:52:22 +0100 Subject: [PATCH 05/98] feat: ETLng integration (#1986) For #1594 --- src/app/ClioApplication.cpp | 16 +- src/app/Stopper.hpp | 13 +- src/etl/ETLService.cpp | 4 +- src/etl/ETLService.hpp | 64 +- src/etl/LoadBalancer.cpp | 9 +- src/etl/LoadBalancer.hpp | 45 +- src/etl/NetworkValidatedLedgersInterface.hpp | 1 + src/etl/Source.hpp | 5 +- src/etl/impl/GrpcSource.cpp | 4 +- src/etl/impl/GrpcSource.hpp | 3 +- src/etl/impl/LedgerFetcher.hpp | 16 +- src/etl/impl/LedgerLoader.hpp | 13 +- src/etl/impl/SourceImpl.hpp | 4 +- src/etlng/CMakeLists.txt | 5 +- src/etlng/ETLService.hpp | 283 +++++++ src/etlng/ETLServiceInterface.hpp | 92 +++ src/etlng/LoadBalancer.cpp | 371 +++++++++ src/etlng/LoadBalancer.hpp | 271 ++++++ src/etlng/LoadBalancerInterface.hpp | 9 + src/etlng/LoaderInterface.hpp | 2 +- src/etlng/Source.cpp | 73 ++ src/etlng/Source.hpp | 194 +++++ src/etlng/impl/ForwardingSource.cpp | 116 +++ src/etlng/impl/ForwardingSource.hpp | 70 ++ src/etlng/impl/Loading.hpp | 1 - src/etlng/impl/Registry.hpp | 9 +- src/etlng/impl/SourceImpl.hpp | 232 ++++++ src/etlng/impl/ext/NFT.hpp | 4 - src/rpc/RPCEngine.hpp | 9 +- src/rpc/common/impl/ForwardingProxy.hpp | 8 +- src/rpc/common/impl/HandlerProvider.cpp | 7 +- src/rpc/common/impl/HandlerProvider.hpp | 10 +- src/rpc/handlers/ServerInfo.hpp | 20 +- src/rpc/handlers/Tx.hpp | 26 +- src/util/newconfig/ConfigDefinition.hpp | 2 +- src/web/RPCServerHandler.hpp | 7 +- src/web/ng/RPCServerHandler.hpp | 7 +- tests/common/util/MockETLService.hpp | 18 +- tests/common/util/MockLoadBalancer.hpp | 19 +- tests/common/util/MockSource.hpp | 6 +- tests/common/util/MockSourceNg.hpp | 245 ++++++ tests/unit/CMakeLists.txt | 3 + tests/unit/app/StopperTests.cpp | 14 +- tests/unit/etl/ETLStateTests.cpp | 1 + tests/unit/etl/GrpcSourceTests.cpp | 7 +- tests/unit/etl/LoadBalancerTests.cpp | 29 +- tests/unit/etl/SourceImplTests.cpp | 4 +- tests/unit/etlng/ForwardingSourceTests.cpp | 196 +++++ tests/unit/etlng/LoadBalancerTests.cpp | 821 +++++++++++++++++++ tests/unit/etlng/SourceImplTests.cpp | 223 +++++ tests/unit/rpc/ForwardingProxyTests.cpp | 6 +- tests/unit/rpc/RPCEngineTests.cpp | 61 +- tests/unit/rpc/handlers/AllHandlerTests.cpp | 2 +- tests/unit/rpc/handlers/ServerInfoTests.cpp | 2 +- tests/unit/rpc/handlers/TxTests.cpp | 56 +- tests/unit/web/RPCServerHandlerTests.cpp | 10 +- tests/unit/web/ng/RPCServerHandlerTests.cpp | 2 +- 57 files changed, 3473 insertions(+), 277 deletions(-) create mode 100644 src/etlng/ETLService.hpp create mode 100644 src/etlng/ETLServiceInterface.hpp create mode 100644 src/etlng/LoadBalancer.cpp create mode 100644 src/etlng/LoadBalancer.hpp create mode 100644 src/etlng/Source.cpp create mode 100644 src/etlng/Source.hpp create mode 100644 src/etlng/impl/ForwardingSource.cpp create mode 100644 src/etlng/impl/ForwardingSource.hpp create mode 100644 src/etlng/impl/SourceImpl.hpp create mode 100644 tests/common/util/MockSourceNg.hpp create mode 100644 tests/unit/etlng/ForwardingSourceTests.cpp create mode 100644 tests/unit/etlng/LoadBalancerTests.cpp create mode 100644 tests/unit/etlng/SourceImplTests.cpp diff --git a/src/app/ClioApplication.cpp b/src/app/ClioApplication.cpp index e68a2797a..6c40900ac 100644 --- a/src/app/ClioApplication.cpp +++ b/src/app/ClioApplication.cpp @@ -27,6 +27,8 @@ #include "etl/ETLService.hpp" #include "etl/LoadBalancer.hpp" #include "etl/NetworkValidatedLedgers.hpp" +#include "etlng/LoadBalancer.hpp" +#include "etlng/LoadBalancerInterface.hpp" #include "feed/SubscriptionManager.hpp" #include "migration/MigrationInspectorFactory.hpp" #include "rpc/Counters.hpp" @@ -130,7 +132,12 @@ ClioApplication::run(bool const useNgWebServer) // ETL uses the balancer to extract data. // The server uses the balancer to forward RPCs to a rippled node. // The balancer itself publishes to streams (transactions_proposed and accounts_proposed) - auto balancer = etl::LoadBalancer::makeLoadBalancer(config_, ioc, backend, subscriptions, ledgers); + auto balancer = [&] -> std::shared_ptr { + if (config_.get("__ng_etl")) + return etlng::LoadBalancer::makeLoadBalancer(config_, ioc, backend, subscriptions, ledgers); + + return etl::LoadBalancer::makeLoadBalancer(config_, ioc, backend, subscriptions, ledgers); + }(); // ETL is responsible for writing and publishing to streams. In read-only mode, ETL only publishes auto etl = etl::ETLService::makeETLService(config_, ioc, backend, subscriptions, balancer, ledgers); @@ -142,12 +149,12 @@ ClioApplication::run(bool const useNgWebServer) config_, backend, subscriptions, balancer, etl, amendmentCenter, counters ); - using RPCEngineType = rpc::RPCEngine; + using RPCEngineType = rpc::RPCEngine; auto const rpcEngine = RPCEngineType::makeRPCEngine(config_, backend, balancer, dosGuard, workQueue, counters, handlerProvider); if (useNgWebServer or config_.get("server.__ng_web_server")) { - web::ng::RPCServerHandler handler{config_, backend, rpcEngine, etl}; + web::ng::RPCServerHandler handler{config_, backend, rpcEngine, etl}; auto expectedAdminVerifier = web::makeAdminVerificationStrategy(config_); if (not expectedAdminVerifier.has_value()) { @@ -188,8 +195,7 @@ ClioApplication::run(bool const useNgWebServer) } // Init the web server - auto handler = - std::make_shared>(config_, backend, rpcEngine, etl); + auto handler = std::make_shared>(config_, backend, rpcEngine, etl); auto const httpServer = web::makeHttpServer(config_, ioc, dosGuard, handler); diff --git a/src/app/Stopper.hpp b/src/app/Stopper.hpp index b4faa1377..daba1164e 100644 --- a/src/app/Stopper.hpp +++ b/src/app/Stopper.hpp @@ -20,8 +20,8 @@ #pragma once #include "data/BackendInterface.hpp" -#include "etl/ETLService.hpp" -#include "etl/LoadBalancer.hpp" +#include "etlng/ETLServiceInterface.hpp" +#include "etlng/LoadBalancerInterface.hpp" #include "feed/SubscriptionManagerInterface.hpp" #include "util/CoroutineGroup.hpp" #include "util/log/Logger.hpp" @@ -74,15 +74,12 @@ public: * @param ioc The io_context to stop. * @return The callback to be called on application stop. */ - template < - web::ng::SomeServer ServerType, - etl::SomeLoadBalancer LoadBalancerType, - etl::SomeETLService ETLServiceType> + template static std::function makeOnStopCallback( ServerType& server, - LoadBalancerType& balancer, - ETLServiceType& etl, + etlng::LoadBalancerInterface& balancer, + etlng::ETLServiceInterface& etl, feed::SubscriptionManagerInterface& subscriptions, data::BackendInterface& backend, boost::asio::io_context& ioc diff --git a/src/etl/ETLService.cpp b/src/etl/ETLService.cpp index 6eca76339..a7207c99b 100644 --- a/src/etl/ETLService.cpp +++ b/src/etl/ETLService.cpp @@ -22,6 +22,7 @@ #include "data/BackendInterface.hpp" #include "etl/CorruptionDetector.hpp" #include "etl/NetworkValidatedLedgersInterface.hpp" +#include "etlng/LoadBalancerInterface.hpp" #include "feed/SubscriptionManagerInterface.hpp" #include "util/Assert.hpp" #include "util/Constants.hpp" @@ -43,6 +44,7 @@ #include namespace etl { + // Database must be populated when this starts std::optional ETLService::runETLPipeline(uint32_t startSequence, uint32_t numExtractors) @@ -265,7 +267,7 @@ ETLService::ETLService( boost::asio::io_context& ioc, std::shared_ptr backend, std::shared_ptr subscriptions, - std::shared_ptr balancer, + std::shared_ptr balancer, std::shared_ptr ledgers ) : backend_(backend) diff --git a/src/etl/ETLService.hpp b/src/etl/ETLService.hpp index d1d865475..ea8b7852d 100644 --- a/src/etl/ETLService.hpp +++ b/src/etl/ETLService.hpp @@ -32,7 +32,12 @@ #include "etl/impl/LedgerLoader.hpp" #include "etl/impl/LedgerPublisher.hpp" #include "etl/impl/Transformer.hpp" +#include "etlng/ETLService.hpp" +#include "etlng/ETLServiceInterface.hpp" +#include "etlng/LoadBalancer.hpp" +#include "etlng/LoadBalancerInterface.hpp" #include "feed/SubscriptionManagerInterface.hpp" +#include "util/Assert.hpp" #include "util/log/Logger.hpp" #include @@ -41,7 +46,6 @@ #include #include -#include #include #include #include @@ -81,14 +85,13 @@ concept SomeETLService = std::derived_from; * the others will fall back to monitoring/publishing. In this sense, this class dynamically transitions from monitoring * to writing and from writing to monitoring, based on the activity of other processes running on different machines. */ -class ETLService : public ETLServiceTag { +class ETLService : public etlng::ETLServiceInterface, ETLServiceTag { // TODO: make these template parameters in ETLService - using LoadBalancerType = LoadBalancer; using DataPipeType = etl::impl::ExtractionDataPipe; using CacheLoaderType = etl::CacheLoader<>; - using LedgerFetcherType = etl::impl::LedgerFetcher; + using LedgerFetcherType = etl::impl::LedgerFetcher; using ExtractorType = etl::impl::Extractor; - using LedgerLoaderType = etl::impl::LedgerLoader; + using LedgerLoaderType = etl::impl::LedgerLoader; using LedgerPublisherType = etl::impl::LedgerPublisher; using AmendmentBlockHandlerType = etl::impl::AmendmentBlockHandler; using TransformerType = @@ -97,7 +100,7 @@ class ETLService : public ETLServiceTag { util::Logger log_{"ETL"}; std::shared_ptr backend_; - std::shared_ptr loadBalancer_; + std::shared_ptr loadBalancer_; std::shared_ptr networkValidatedLedgers_; std::uint32_t extractorThreads_ = 1; @@ -132,7 +135,7 @@ public: boost::asio::io_context& ioc, std::shared_ptr backend, std::shared_ptr subscriptions, - std::shared_ptr balancer, + std::shared_ptr balancer, std::shared_ptr ledgers ); @@ -154,20 +157,33 @@ public: * @param ledgers The network validated ledgers datastructure * @return A shared pointer to a new instance of ETLService */ - static std::shared_ptr + static std::shared_ptr makeETLService( util::config::ClioConfigDefinition const& config, boost::asio::io_context& ioc, std::shared_ptr backend, std::shared_ptr subscriptions, - std::shared_ptr balancer, + std::shared_ptr balancer, std::shared_ptr ledgers ) { - auto etl = std::make_shared(config, ioc, backend, subscriptions, balancer, ledgers); - etl->run(); + std::shared_ptr ret; - return etl; + if (config.get("__ng_etl")) { + ASSERT( + std::dynamic_pointer_cast(balancer), + "LoadBalancer type must be etlng::LoadBalancer" + ); + ret = std::make_shared(config, backend, subscriptions, balancer, ledgers); + } else { + ASSERT( + std::dynamic_pointer_cast(balancer), "LoadBalancer type must be etl::LoadBalancer" + ); + ret = std::make_shared(config, ioc, backend, subscriptions, balancer, ledgers); + } + + ret->run(); + return ret; } /** @@ -184,7 +200,7 @@ public: * @note This method blocks until the ETL service has stopped. */ void - stop() + stop() override { LOG(log_.info()) << "Stop called"; @@ -203,7 +219,7 @@ public: * @return Time passed since last ledger close */ std::uint32_t - lastCloseAgeSeconds() const + lastCloseAgeSeconds() const override { return ledgerPublisher_.lastCloseAgeSeconds(); } @@ -214,7 +230,7 @@ public: * @return true if currently amendment blocked; false otherwise */ bool - isAmendmentBlocked() const + isAmendmentBlocked() const override { return state_.isAmendmentBlocked; } @@ -225,7 +241,7 @@ public: * @return true if corruption of DB was detected and cache was stopped. */ bool - isCorruptionDetected() const + isCorruptionDetected() const override { return state_.isCorruptionDetected; } @@ -236,7 +252,7 @@ public: * @return The state of ETL as a JSON object */ boost::json::object - getInfo() const + getInfo() const override { boost::json::object result; @@ -254,11 +270,17 @@ public: * @return The etl nodes' state, nullopt if etl nodes are not connected */ std::optional - getETLState() const noexcept + getETLState() const noexcept override { return loadBalancer_->getETLState(); } + /** + * @brief Start all components to run ETL service. + */ + void + run() override; + private: /** * @brief Run the ETL pipeline. @@ -325,12 +347,6 @@ private: return numMarkers_; } - /** - * @brief Start all components to run ETL service. - */ - void - run(); - /** * @brief Spawn the worker thread and start monitoring. */ diff --git a/src/etl/LoadBalancer.cpp b/src/etl/LoadBalancer.cpp index 7dc0d1925..a26c5bb58 100644 --- a/src/etl/LoadBalancer.cpp +++ b/src/etl/LoadBalancer.cpp @@ -23,6 +23,7 @@ #include "etl/ETLState.hpp" #include "etl/NetworkValidatedLedgersInterface.hpp" #include "etl/Source.hpp" +#include "etlng/LoadBalancerInterface.hpp" #include "feed/SubscriptionManagerInterface.hpp" #include "rpc/Errors.hpp" #include "util/Assert.hpp" @@ -59,7 +60,7 @@ using namespace util::config; namespace etl { -std::shared_ptr +std::shared_ptr LoadBalancer::makeLoadBalancer( ClioConfigDefinition const& config, boost::asio::io_context& ioc, @@ -175,12 +176,12 @@ LoadBalancer::~LoadBalancer() } std::vector -LoadBalancer::loadInitialLedger(uint32_t sequence, bool cacheOnly, std::chrono::steady_clock::duration retryAfter) +LoadBalancer::loadInitialLedger(uint32_t sequence, std::chrono::steady_clock::duration retryAfter) { std::vector response; execute( - [this, &response, &sequence, cacheOnly](auto& source) { - auto [data, res] = source->loadInitialLedger(sequence, downloadRanges_, cacheOnly); + [this, &response, &sequence](auto& source) { + auto [data, res] = source->loadInitialLedger(sequence, downloadRanges_); if (!res) { LOG(log_.error()) << "Failed to download initial ledger." diff --git a/src/etl/LoadBalancer.hpp b/src/etl/LoadBalancer.hpp index cfe01d019..169636685 100644 --- a/src/etl/LoadBalancer.hpp +++ b/src/etl/LoadBalancer.hpp @@ -23,8 +23,11 @@ #include "etl/ETLState.hpp" #include "etl/NetworkValidatedLedgersInterface.hpp" #include "etl/Source.hpp" +#include "etlng/InitialLoadObserverInterface.hpp" +#include "etlng/LoadBalancerInterface.hpp" #include "feed/SubscriptionManagerInterface.hpp" #include "rpc/Errors.hpp" +#include "util/Assert.hpp" #include "util/Mutex.hpp" #include "util/ResponseExpirationCache.hpp" #include "util/log/Logger.hpp" @@ -41,13 +44,13 @@ #include #include -#include #include #include #include #include #include #include +#include #include namespace etl { @@ -69,7 +72,7 @@ concept SomeLoadBalancer = std::derived_from; * which ledgers have been validated by the network, and the range of ledgers each etl source has). This class also * allows requests for ledger data to be load balanced across all possible ETL sources. */ -class LoadBalancer : public LoadBalancerTag { +class LoadBalancer : public etlng::LoadBalancerInterface, LoadBalancerTag { public: using RawLedgerObjectType = org::xrpl::rpc::v1::RawLedgerObject; using GetLedgerResponseType = org::xrpl::rpc::v1::GetLedgerResponse; @@ -133,7 +136,7 @@ public: * @param sourceFactory A factory function to create a source * @return A shared pointer to a new instance of LoadBalancer */ - static std::shared_ptr + static std::shared_ptr makeLoadBalancer( util::config::ClioConfigDefinition const& config, boost::asio::io_context& ioc, @@ -150,16 +153,32 @@ public: * @note This function will retry indefinitely until the ledger is downloaded. * * @param sequence Sequence of ledger to download - * @param cacheOnly Whether to only write to cache and not to the DB; defaults to false + * @param retryAfter Time to wait between retries (2 seconds by default) + * @return A std::vector The ledger data + */ + std::vector + loadInitialLedger(uint32_t sequence, std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2}) + override; + + /** + * @brief Load the initial ledger, writing data to the queue. + * @note This function will retry indefinitely until the ledger is downloaded. + * + * @param sequence Sequence of ledger to download + * @param observer The observer to notify of progress * @param retryAfter Time to wait between retries (2 seconds by default) * @return A std::vector The ledger data */ std::vector loadInitialLedger( - uint32_t sequence, - bool cacheOnly = false, - std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2} - ); + [[maybe_unused]] uint32_t sequence, + [[maybe_unused]] etlng::InitialLoadObserverInterface& observer, + [[maybe_unused]] std::chrono::steady_clock::duration retryAfter + ) override + { + ASSERT(false, "Not available for old ETL"); + std::unreachable(); + } /** * @brief Fetch data for a specific ledger. @@ -180,7 +199,7 @@ public: bool getObjects, bool getObjectNeighbors, std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2} - ); + ) override; /** * @brief Represent the state of this load balancer as a JSON object @@ -188,7 +207,7 @@ public: * @return JSON representation of the state of this load balancer. */ boost::json::value - toJson() const; + toJson() const override; /** * @brief Forward a JSON RPC request to a randomly selected rippled node. @@ -205,14 +224,14 @@ public: std::optional const& clientIp, bool isAdmin, boost::asio::yield_context yield - ); + ) override; /** * @brief Return state of ETL nodes. * @return ETL state, nullopt if etl nodes not available */ std::optional - getETLState() noexcept; + getETLState() noexcept override; /** * @brief Stop the load balancer. This will stop all subscription sources. @@ -221,7 +240,7 @@ public: * @param yield The coroutine context */ void - stop(boost::asio::yield_context yield); + stop(boost::asio::yield_context yield) override; private: /** diff --git a/src/etl/NetworkValidatedLedgersInterface.hpp b/src/etl/NetworkValidatedLedgersInterface.hpp index d2ab9d12b..067407c8c 100644 --- a/src/etl/NetworkValidatedLedgersInterface.hpp +++ b/src/etl/NetworkValidatedLedgersInterface.hpp @@ -26,6 +26,7 @@ #include #include + namespace etl { /** diff --git a/src/etl/Source.hpp b/src/etl/Source.hpp index 91d9bf817..6ff7e7285 100644 --- a/src/etl/Source.hpp +++ b/src/etl/Source.hpp @@ -23,8 +23,6 @@ #include "etl/NetworkValidatedLedgersInterface.hpp" #include "feed/SubscriptionManagerInterface.hpp" #include "rpc/Errors.hpp" -#include "util/log/Logger.hpp" -#include "util/newconfig/ConfigDefinition.hpp" #include "util/newconfig/ObjectView.hpp" #include @@ -130,11 +128,10 @@ public: * * @param sequence Sequence of the ledger to download * @param numMarkers Number of markers to generate for async calls - * @param cacheOnly Only insert into cache, not the DB; defaults to false * @return A std::pair of the data and a bool indicating whether the download was successful */ virtual std::pair, bool> - loadInitialLedger(uint32_t sequence, std::uint32_t numMarkers, bool cacheOnly = false) = 0; + loadInitialLedger(uint32_t sequence, std::uint32_t numMarkers) = 0; /** * @brief Forward a request to rippled. diff --git a/src/etl/impl/GrpcSource.cpp b/src/etl/impl/GrpcSource.cpp index 29be5b224..539cdf481 100644 --- a/src/etl/impl/GrpcSource.cpp +++ b/src/etl/impl/GrpcSource.cpp @@ -98,7 +98,7 @@ GrpcSource::fetchLedger(uint32_t sequence, bool getObjects, bool getObjectNeighb } std::pair, bool> -GrpcSource::loadInitialLedger(uint32_t const sequence, uint32_t const numMarkers, bool const cacheOnly) +GrpcSource::loadInitialLedger(uint32_t const sequence, uint32_t const numMarkers) { if (!stub_) return {{}, false}; @@ -130,7 +130,7 @@ GrpcSource::loadInitialLedger(uint32_t const sequence, uint32_t const numMarkers LOG(log_.trace()) << "Marker prefix = " << ptr->getMarkerPrefix(); - auto result = ptr->process(stub_, cq, *backend_, abort, cacheOnly); + auto result = ptr->process(stub_, cq, *backend_, abort); if (result != etl::impl::AsyncCallData::CallStatus::MORE) { ++numFinished; LOG(log_.debug()) << "Finished a marker. Current number of finished = " << numFinished; diff --git a/src/etl/impl/GrpcSource.hpp b/src/etl/impl/GrpcSource.hpp index 5d41f193d..248f4191d 100644 --- a/src/etl/impl/GrpcSource.hpp +++ b/src/etl/impl/GrpcSource.hpp @@ -60,11 +60,10 @@ public: * * @param sequence Sequence of the ledger to download * @param numMarkers Number of markers to generate for async calls - * @param cacheOnly Only insert into cache, not the DB; defaults to false * @return A std::pair of the data and a bool indicating whether the download was successful */ std::pair, bool> - loadInitialLedger(uint32_t sequence, uint32_t numMarkers, bool cacheOnly = false); + loadInitialLedger(uint32_t sequence, uint32_t numMarkers); }; } // namespace etl::impl diff --git a/src/etl/impl/LedgerFetcher.hpp b/src/etl/impl/LedgerFetcher.hpp index 9d8a0df88..acb792ac2 100644 --- a/src/etl/impl/LedgerFetcher.hpp +++ b/src/etl/impl/LedgerFetcher.hpp @@ -20,6 +20,8 @@ #pragma once #include "data/BackendInterface.hpp" +#include "etl/LedgerFetcherInterface.hpp" +#include "etlng/LoadBalancerInterface.hpp" #include "util/log/Logger.hpp" #include @@ -34,22 +36,18 @@ namespace etl::impl { /** * @brief GRPC Ledger data fetcher */ -template -class LedgerFetcher { -public: - using OptionalGetLedgerResponseType = typename LoadBalancerType::OptionalGetLedgerResponseType; - +class LedgerFetcher : public LedgerFetcherInterface { private: util::Logger log_{"ETL"}; std::shared_ptr backend_; - std::shared_ptr loadBalancer_; + std::shared_ptr loadBalancer_; public: /** * @brief Create an instance of the fetcher */ - LedgerFetcher(std::shared_ptr backend, std::shared_ptr balancer) + LedgerFetcher(std::shared_ptr backend, std::shared_ptr balancer) : backend_(std::move(backend)), loadBalancer_(std::move(balancer)) { } @@ -64,7 +62,7 @@ public: * @return Ledger header and transaction+metadata blobs; Empty optional if the server is shutting down */ [[nodiscard]] OptionalGetLedgerResponseType - fetchData(uint32_t sequence) + fetchData(uint32_t sequence) override { LOG(log_.debug()) << "Attempting to fetch ledger with sequence = " << sequence; @@ -84,7 +82,7 @@ public: * @return Ledger data diff between sequance and parent; Empty optional if the server is shutting down */ [[nodiscard]] OptionalGetLedgerResponseType - fetchDataAndDiff(uint32_t sequence) + fetchDataAndDiff(uint32_t sequence) override { LOG(log_.debug()) << "Attempting to fetch ledger with sequence = " << sequence; diff --git a/src/etl/impl/LedgerLoader.hpp b/src/etl/impl/LedgerLoader.hpp index 716fa8718..6be0f30e3 100644 --- a/src/etl/impl/LedgerLoader.hpp +++ b/src/etl/impl/LedgerLoader.hpp @@ -26,6 +26,7 @@ #include "etl/NFTHelpers.hpp" #include "etl/SystemState.hpp" #include "etl/impl/LedgerFetcher.hpp" +#include "etlng/LoadBalancerInterface.hpp" #include "util/Assert.hpp" #include "util/LedgerUtils.hpp" #include "util/Profiler.hpp" @@ -65,18 +66,18 @@ namespace etl::impl { /** * @brief Loads ledger data into the DB */ -template +template class LedgerLoader { public: - using GetLedgerResponseType = typename LoadBalancerType::GetLedgerResponseType; - using OptionalGetLedgerResponseType = typename LoadBalancerType::OptionalGetLedgerResponseType; - using RawLedgerObjectType = typename LoadBalancerType::RawLedgerObjectType; + using GetLedgerResponseType = etlng::LoadBalancerInterface::GetLedgerResponseType; + using OptionalGetLedgerResponseType = etlng::LoadBalancerInterface::OptionalGetLedgerResponseType; + using RawLedgerObjectType = etlng::LoadBalancerInterface::RawLedgerObjectType; private: util::Logger log_{"ETL"}; std::shared_ptr backend_; - std::shared_ptr loadBalancer_; + std::shared_ptr loadBalancer_; std::reference_wrapper fetcher_; std::reference_wrapper state_; // shared state for ETL @@ -86,7 +87,7 @@ public: */ LedgerLoader( std::shared_ptr backend, - std::shared_ptr balancer, + std::shared_ptr balancer, LedgerFetcherType& fetcher, SystemState const& state ) diff --git a/src/etl/impl/SourceImpl.hpp b/src/etl/impl/SourceImpl.hpp index 3f2f25a33..a2b4175e0 100644 --- a/src/etl/impl/SourceImpl.hpp +++ b/src/etl/impl/SourceImpl.hpp @@ -202,9 +202,9 @@ public: * @return A std::pair of the data and a bool indicating whether the download was successful */ std::pair, bool> - loadInitialLedger(uint32_t sequence, std::uint32_t numMarkers, bool cacheOnly = false) final + loadInitialLedger(uint32_t sequence, std::uint32_t numMarkers) final { - return grpcSource_.loadInitialLedger(sequence, numMarkers, cacheOnly); + return grpcSource_.loadInitialLedger(sequence, numMarkers); } /** diff --git a/src/etlng/CMakeLists.txt b/src/etlng/CMakeLists.txt index d0f2854e9..49e443a34 100644 --- a/src/etlng/CMakeLists.txt +++ b/src/etlng/CMakeLists.txt @@ -2,10 +2,13 @@ add_library(clio_etlng) target_sources( clio_etlng - PRIVATE impl/AmendmentBlockHandler.cpp + PRIVATE LoadBalancer.cpp + Source.cpp + impl/AmendmentBlockHandler.cpp impl/AsyncGrpcCall.cpp impl/Extraction.cpp impl/GrpcSource.cpp + impl/ForwardingSource.cpp impl/Loading.cpp impl/Monitor.cpp impl/TaskManager.cpp diff --git a/src/etlng/ETLService.hpp b/src/etlng/ETLService.hpp new file mode 100644 index 000000000..960d6e456 --- /dev/null +++ b/src/etlng/ETLService.hpp @@ -0,0 +1,283 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "data/BackendInterface.hpp" +#include "data/LedgerCache.hpp" +#include "data/Types.hpp" +#include "etl/CacheLoader.hpp" +#include "etl/ETLState.hpp" +#include "etl/LedgerFetcherInterface.hpp" +#include "etl/NetworkValidatedLedgersInterface.hpp" +#include "etl/SystemState.hpp" +#include "etl/impl/AmendmentBlockHandler.hpp" +#include "etl/impl/LedgerFetcher.hpp" +#include "etl/impl/LedgerPublisher.hpp" +#include "etlng/AmendmentBlockHandlerInterface.hpp" +#include "etlng/ETLServiceInterface.hpp" +#include "etlng/ExtractorInterface.hpp" +#include "etlng/LoadBalancerInterface.hpp" +#include "etlng/impl/AmendmentBlockHandler.hpp" +#include "etlng/impl/Extraction.hpp" +#include "etlng/impl/Loading.hpp" +#include "etlng/impl/Monitor.hpp" +#include "etlng/impl/Registry.hpp" +#include "etlng/impl/Scheduling.hpp" +#include "etlng/impl/TaskManager.hpp" +#include "etlng/impl/ext/Cache.hpp" +#include "etlng/impl/ext/Core.hpp" +#include "etlng/impl/ext/NFT.hpp" +#include "etlng/impl/ext/Successor.hpp" +#include "feed/SubscriptionManagerInterface.hpp" +#include "util/Assert.hpp" +#include "util/Profiler.hpp" +#include "util/async/context/BasicExecutionContext.hpp" +#include "util/config/Config.hpp" +#include "util/log/Logger.hpp" +#include "util/newconfig/ConfigDefinition.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace etlng { + +/** + * @brief This class is responsible for continuously extracting data from a p2p node, and writing that data to the + * databases. + * + * Usually, multiple different processes share access to the same network accessible databases, in which case only one + * such process is performing ETL and writing to the database. The other processes simply monitor the database for new + * ledgers, and publish those ledgers to the various subscription streams. If a monitoring process determines that the + * ETL writer has failed (no new ledgers written for some time), the process will attempt to become the ETL writer. + * + * If there are multiple monitoring processes that try to become the ETL writer at the same time, one will win out, and + * the others will fall back to monitoring/publishing. In this sense, this class dynamically transitions from monitoring + * to writing and from writing to monitoring, based on the activity of other processes running on different machines. + */ +class ETLService : public ETLServiceInterface { + util::Logger log_{"ETL"}; + + std::shared_ptr backend_; + std::shared_ptr subscriptions_; + std::shared_ptr balancer_; + std::shared_ptr ledgers_; + std::shared_ptr> cacheLoader_; + + std::shared_ptr fetcher_; + std::shared_ptr extractor_; + + etl::SystemState state_; + util::async::CoroExecutionContext ctx_{8}; + + std::shared_ptr amendmentBlockHandler_; + std::shared_ptr loader_; + + std::optional> mainLoop_; + +public: + /** + * @brief Create an instance of ETLService. + * + * @param config The configuration to use + * @param backend BackendInterface implementation + * @param subscriptions Subscription manager + * @param balancer Load balancer to use + * @param ledgers The network validated ledgers datastructure + */ + ETLService( + util::config::ClioConfigDefinition const& config, + std::shared_ptr backend, + std::shared_ptr subscriptions, + std::shared_ptr balancer, + std::shared_ptr ledgers + ) + : backend_(std::move(backend)) + , subscriptions_(std::move(subscriptions)) + , balancer_(std::move(balancer)) + , ledgers_(std::move(ledgers)) + , cacheLoader_(std::make_shared>(config, backend_, backend_->cache())) + , fetcher_(std::make_shared(backend_, balancer_)) + , extractor_(std::make_shared(fetcher_)) + , amendmentBlockHandler_(std::make_shared(ctx_, state_)) + , loader_(std::make_shared( + backend_, + fetcher_, + impl::makeRegistry( + impl::CacheExt{backend_->cache()}, + impl::CoreExt{backend_}, + impl::SuccessorExt{backend_, backend_->cache()}, + impl::NFTExt{backend_} + ), + amendmentBlockHandler_ + )) + { + LOG(log_.info()) << "Creating ETLng..."; + } + + ~ETLService() override + { + LOG(log_.debug()) << "Stopping ETLng"; + } + + void + run() override + { + LOG(log_.info()) << "run() in ETLng..."; + + mainLoop_.emplace(ctx_.execute([this] { + auto const rng = loadInitialLedgerIfNeeded(); + + LOG(log_.info()) << "Waiting for next ledger to be validated by network..."; + std::optional mostRecentValidated = ledgers_->getMostRecent(); + + if (not mostRecentValidated) { + LOG(log_.info()) << "The wait for the next validated ledger has been aborted. " + "Exiting monitor loop"; + return; + } + + ASSERT(rng.has_value(), "Ledger range can't be null"); + auto const nextSequence = rng->maxSequence + 1; + + LOG(log_.debug()) << "Database is populated. Starting monitor loop. sequence = " << nextSequence; + + auto scheduler = impl::makeScheduler(impl::ForwardScheduler{*ledgers_, nextSequence} + // impl::BackfillScheduler{nextSequence - 1, nextSequence - 1000}, + // TODO lift limit and start with rng.minSeq + ); + + auto man = impl::TaskManager(ctx_, *scheduler, *extractor_, *loader_); + + // TODO: figure out this: std::make_shared(backend_, ledgers_, nextSequence) + man.run({}); // TODO: needs to be interruptable and fill out settings + })); + } + + void + stop() override + { + LOG(log_.info()) << "Stop called"; + // TODO: stop the service correctly + } + + boost::json::object + getInfo() const override + { + // TODO + return {{"ok", true}}; + } + + bool + isAmendmentBlocked() const override + { + // TODO + return false; + } + + bool + isCorruptionDetected() const override + { + // TODO + return false; + } + + std::optional + getETLState() const override + { + // TODO + return std::nullopt; + } + + std::uint32_t + lastCloseAgeSeconds() const override + { + // TODO + return 0; + } + +private: + // TODO: this better be std::expected + std::optional + loadInitialLedgerIfNeeded() + { + if (auto rng = backend_->hardFetchLedgerRangeNoThrow(); not rng.has_value()) { + LOG(log_.info()) << "Database is empty. Will download a ledger from the network."; + + try { + LOG(log_.info()) << "Waiting for next ledger to be validated by network..."; + if (auto const mostRecentValidated = ledgers_->getMostRecent(); mostRecentValidated.has_value()) { + auto const seq = *mostRecentValidated; + LOG(log_.info()) << "Ledger " << seq << " has been validated. Downloading... "; + + auto [ledger, timeDiff] = ::util::timed>([this, seq]() { + return extractor_->extractLedgerOnly(seq).and_then([this, seq](auto&& data) { + // TODO: loadInitialLedger in balancer should be called fetchEdgeKeys or similar + data.edgeKeys = balancer_->loadInitialLedger(seq, *loader_); + + // TODO: this should be interruptable for graceful shutdown + return loader_->loadInitialLedger(data); + }); + }); + + LOG(log_.debug()) << "Time to download and store ledger = " << timeDiff; + LOG(log_.info()) << "Finished loadInitialLedger. cache size = " << backend_->cache().size(); + + if (ledger.has_value()) + return backend_->hardFetchLedgerRangeNoThrow(); + + LOG(log_.error()) << "Failed to load initial ledger. Exiting monitor loop"; + } else { + LOG(log_.info()) << "The wait for the next validated ledger has been aborted. " + "Exiting monitor loop"; + } + } catch (std::runtime_error const& e) { + LOG(log_.fatal()) << "Failed to load initial ledger: " << e.what(); + amendmentBlockHandler_->notifyAmendmentBlocked(); + } + } else { + LOG(log_.info()) << "Database already populated. Picking up from the tip of history"; + cacheLoader_->load(rng->maxSequence); + + return rng; + } + + return std::nullopt; + } +}; +} // namespace etlng diff --git a/src/etlng/ETLServiceInterface.hpp b/src/etlng/ETLServiceInterface.hpp new file mode 100644 index 000000000..7f0b2fc2b --- /dev/null +++ b/src/etlng/ETLServiceInterface.hpp @@ -0,0 +1,92 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "etl/ETLState.hpp" + +#include + +#include +#include + +namespace etlng { + +/** + * @brief This is a base class for any ETL service implementations. + * @note A ETL service is responsible for continuously extracting data from a p2p node, and writing that data to the + * databases. + */ +struct ETLServiceInterface { + virtual ~ETLServiceInterface() = default; + + /** + * @brief Start all components to run ETL service. + */ + virtual void + run() = 0; + + /** + * @brief Stop the ETL service. + * @note This method blocks until the ETL service has stopped. + */ + virtual void + stop() = 0; + + /** + * @brief Get state of ETL as a JSON object + * + * @return The state of ETL as a JSON object + */ + [[nodiscard]] virtual boost::json::object + getInfo() const = 0; + + /** + * @brief Check for the amendment blocked state. + * + * @return true if currently amendment blocked; false otherwise + */ + [[nodiscard]] virtual bool + isAmendmentBlocked() const = 0; + + /** + * @brief Check whether Clio detected DB corruptions. + * + * @return true if corruption of DB was detected and cache was stopped. + */ + [[nodiscard]] virtual bool + isCorruptionDetected() const = 0; + + /** + * @brief Get the etl nodes' state + * @return The etl nodes' state, nullopt if etl nodes are not connected + */ + [[nodiscard]] virtual std::optional + getETLState() const = 0; + + /** + * @brief Get time passed since last ledger close, in seconds. + * + * @return Time passed since last ledger close + */ + [[nodiscard]] virtual std::uint32_t + lastCloseAgeSeconds() const = 0; +}; + +} // namespace etlng diff --git a/src/etlng/LoadBalancer.cpp b/src/etlng/LoadBalancer.cpp new file mode 100644 index 000000000..fa34af78e --- /dev/null +++ b/src/etlng/LoadBalancer.cpp @@ -0,0 +1,371 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2023, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "etlng/LoadBalancer.hpp" + +#include "data/BackendInterface.hpp" +#include "etl/ETLState.hpp" +#include "etl/NetworkValidatedLedgersInterface.hpp" +#include "etlng/InitialLoadObserverInterface.hpp" +#include "etlng/LoadBalancerInterface.hpp" +#include "etlng/Source.hpp" +#include "feed/SubscriptionManagerInterface.hpp" +#include "rpc/Errors.hpp" +#include "util/Assert.hpp" +#include "util/CoroutineGroup.hpp" +#include "util/Random.hpp" +#include "util/ResponseExpirationCache.hpp" +#include "util/log/Logger.hpp" +#include "util/newconfig/ArrayView.hpp" +#include "util/newconfig/ConfigDefinition.hpp" +#include "util/newconfig/ObjectView.hpp" + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace util::config; + +namespace etlng { + +std::shared_ptr +LoadBalancer::makeLoadBalancer( + ClioConfigDefinition const& config, + boost::asio::io_context& ioc, + std::shared_ptr backend, + std::shared_ptr subscriptions, + std::shared_ptr validatedLedgers, + SourceFactory sourceFactory +) +{ + return std::make_shared( + config, ioc, std::move(backend), std::move(subscriptions), std::move(validatedLedgers), std::move(sourceFactory) + ); +} + +LoadBalancer::LoadBalancer( + ClioConfigDefinition const& config, + boost::asio::io_context& ioc, + std::shared_ptr backend, + std::shared_ptr subscriptions, + std::shared_ptr validatedLedgers, + SourceFactory sourceFactory +) +{ + auto const forwardingCacheTimeout = config.get("forwarding.cache_timeout"); + if (forwardingCacheTimeout > 0.f) { + forwardingCache_ = util::ResponseExpirationCache{ + util::config::ClioConfigDefinition::toMilliseconds(forwardingCacheTimeout), + {"server_info", "server_state", "server_definitions", "fee", "ledger_closed"} + }; + } + + auto const numMarkers = config.getValueView("num_markers"); + if (numMarkers.hasValue()) { + auto const value = numMarkers.asIntType(); + downloadRanges_ = value; + } else if (backend->fetchLedgerRange()) { + downloadRanges_ = 4; + } + + auto const allowNoEtl = config.get("allow_no_etl"); + + auto const checkOnETLFailure = [this, allowNoEtl](std::string const& log) { + LOG(log_.warn()) << log; + + if (!allowNoEtl) { + LOG(log_.error()) << "Set allow_no_etl as true in config to allow clio run without valid ETL sources."; + throw std::logic_error("ETL configuration error."); + } + }; + + auto const forwardingTimeout = + ClioConfigDefinition::toMilliseconds(config.get("forwarding.request_timeout")); + auto const etlArray = config.getArray("etl_sources"); + for (auto it = etlArray.begin(); it != etlArray.end(); ++it) { + auto source = sourceFactory( + *it, + ioc, + subscriptions, + validatedLedgers, + forwardingTimeout, + [this]() { + if (not hasForwardingSource_.lock().get()) + chooseForwardingSource(); + }, + [this](bool wasForwarding) { + if (wasForwarding) + chooseForwardingSource(); + }, + [this]() { + if (forwardingCache_.has_value()) + forwardingCache_->invalidate(); + } + ); + + // checking etl node validity + auto const stateOpt = etl::ETLState::fetchETLStateFromSource(*source); + + if (!stateOpt) { + LOG(log_.warn()) << "Failed to fetch ETL state from source = " << source->toString() + << " Please check the configuration and network"; + } else if (etlState_ && etlState_->networkID && stateOpt->networkID && + etlState_->networkID != stateOpt->networkID) { + checkOnETLFailure(fmt::format( + "ETL sources must be on the same network. Source network id = {} does not match others network id = {}", + *(stateOpt->networkID), + *(etlState_->networkID) + )); + } else { + etlState_ = stateOpt; + } + + sources_.push_back(std::move(source)); + LOG(log_.info()) << "Added etl source - " << sources_.back()->toString(); + } + + if (!etlState_) + checkOnETLFailure("Failed to fetch ETL state from any source. Please check the configuration and network"); + + if (sources_.empty()) + checkOnETLFailure("No ETL sources configured. Please check the configuration"); + + // This is made separate from source creation to prevent UB in case one of the sources will call + // chooseForwardingSource while we are still filling the sources_ vector + for (auto const& source : sources_) { + source->run(); + } +} + +LoadBalancer::~LoadBalancer() +{ + sources_.clear(); +} + +std::vector +LoadBalancer::loadInitialLedger( + uint32_t sequence, + etlng::InitialLoadObserverInterface& loadObserver, + std::chrono::steady_clock::duration retryAfter +) +{ + std::vector response; + execute( + [this, &response, &sequence, &loadObserver](auto& source) { + auto [data, res] = source->loadInitialLedger(sequence, downloadRanges_, loadObserver); + + if (!res) { + LOG(log_.error()) << "Failed to download initial ledger." + << " Sequence = " << sequence << " source = " << source->toString(); + } else { + response = std::move(data); + } + + return res; + }, + sequence, + retryAfter + ); + return response; +} + +LoadBalancer::OptionalGetLedgerResponseType +LoadBalancer::fetchLedger( + uint32_t ledgerSequence, + bool getObjects, + bool getObjectNeighbors, + std::chrono::steady_clock::duration retryAfter +) +{ + GetLedgerResponseType response; + execute( + [&response, ledgerSequence, getObjects, getObjectNeighbors, log = log_](auto& source) { + auto [status, data] = source->fetchLedger(ledgerSequence, getObjects, getObjectNeighbors); + response = std::move(data); + if (status.ok() && response.validated()) { + LOG(log.info()) << "Successfully fetched ledger = " << ledgerSequence + << " from source = " << source->toString(); + return true; + } + + LOG(log.warn()) << "Could not fetch ledger " << ledgerSequence << ", Reply: " << response.DebugString() + << ", error_code: " << status.error_code() << ", error_msg: " << status.error_message() + << ", source = " << source->toString(); + return false; + }, + ledgerSequence, + retryAfter + ); + return response; +} + +std::expected +LoadBalancer::forwardToRippled( + boost::json::object const& request, + std::optional const& clientIp, + bool isAdmin, + boost::asio::yield_context yield +) +{ + if (not request.contains("command")) + return std::unexpected{rpc::ClioError::RpcCommandIsMissing}; + + auto const cmd = boost::json::value_to(request.at("command")); + if (forwardingCache_) { + if (auto cachedResponse = forwardingCache_->get(cmd); cachedResponse) { + return std::move(cachedResponse).value(); + } + } + + ASSERT(not sources_.empty(), "ETL sources must be configured to forward requests."); + std::size_t sourceIdx = util::Random::uniform(0ul, sources_.size() - 1); + + auto numAttempts = 0u; + + auto xUserValue = isAdmin ? kADMIN_FORWARDING_X_USER_VALUE : kUSER_FORWARDING_X_USER_VALUE; + + std::optional response; + rpc::ClioError error = rpc::ClioError::EtlConnectionError; + while (numAttempts < sources_.size()) { + auto res = sources_[sourceIdx]->forwardToRippled(request, clientIp, xUserValue, yield); + if (res) { + response = std::move(res).value(); + break; + } + error = std::max(error, res.error()); // Choose the best result between all sources + + sourceIdx = (sourceIdx + 1) % sources_.size(); + ++numAttempts; + } + + if (response) { + if (forwardingCache_ and not response->contains("error")) + forwardingCache_->put(cmd, *response); + return std::move(response).value(); + } + + return std::unexpected{error}; +} + +boost::json::value +LoadBalancer::toJson() const +{ + boost::json::array ret; + for (auto& src : sources_) + ret.push_back(src->toJson()); + + return ret; +} + +template +void +LoadBalancer::execute(Func f, uint32_t ledgerSequence, std::chrono::steady_clock::duration retryAfter) +{ + ASSERT(not sources_.empty(), "ETL sources must be configured to execute functions."); + size_t sourceIdx = util::Random::uniform(0ul, sources_.size() - 1); + + size_t numAttempts = 0; + + while (true) { + auto& source = sources_[sourceIdx]; + + LOG(log_.debug()) << "Attempting to execute func. ledger sequence = " << ledgerSequence + << " - source = " << source->toString(); + // Originally, it was (source->hasLedger(ledgerSequence) || true) + /* Sometimes rippled has ledger but doesn't actually know. However, + but this does NOT happen in the normal case and is safe to remove + This || true is only needed when loading full history standalone */ + if (source->hasLedger(ledgerSequence)) { + bool const res = f(source); + if (res) { + LOG(log_.debug()) << "Successfully executed func at source = " << source->toString() + << " - ledger sequence = " << ledgerSequence; + break; + } + + LOG(log_.warn()) << "Failed to execute func at source = " << source->toString() + << " - ledger sequence = " << ledgerSequence; + } else { + LOG(log_.warn()) << "Ledger not present at source = " << source->toString() + << " - ledger sequence = " << ledgerSequence; + } + sourceIdx = (sourceIdx + 1) % sources_.size(); + numAttempts++; + if (numAttempts % sources_.size() == 0) { + LOG(log_.info()) << "Ledger sequence " << ledgerSequence + << " is not yet available from any configured sources. Sleeping and trying again"; + std::this_thread::sleep_for(retryAfter); + } + } +} + +std::optional +LoadBalancer::getETLState() noexcept +{ + if (!etlState_) { + // retry ETLState fetch + etlState_ = etl::ETLState::fetchETLStateFromSource(*this); + } + return etlState_; +} + +void +LoadBalancer::stop(boost::asio::yield_context yield) +{ + util::CoroutineGroup group{yield}; + std::ranges::for_each(sources_, [&group, yield](auto& source) { + group.spawn(yield, [&source](boost::asio::yield_context innerYield) { source->stop(innerYield); }); + }); + group.asyncWait(yield); +} + +void +LoadBalancer::chooseForwardingSource() +{ + LOG(log_.info()) << "Choosing a new source to forward subscriptions"; + auto hasForwardingSourceLock = hasForwardingSource_.lock(); + hasForwardingSourceLock.get() = false; + for (auto& source : sources_) { + if (not hasForwardingSourceLock.get() and source->isConnected()) { + source->setForwarding(true); + hasForwardingSourceLock.get() = true; + } else { + source->setForwarding(false); + } + } +} + +} // namespace etlng diff --git a/src/etlng/LoadBalancer.hpp b/src/etlng/LoadBalancer.hpp new file mode 100644 index 000000000..ac01100da --- /dev/null +++ b/src/etlng/LoadBalancer.hpp @@ -0,0 +1,271 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2022, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "data/BackendInterface.hpp" +#include "etl/ETLState.hpp" +#include "etl/NetworkValidatedLedgersInterface.hpp" +#include "etlng/InitialLoadObserverInterface.hpp" +#include "etlng/LoadBalancerInterface.hpp" +#include "etlng/Source.hpp" +#include "feed/SubscriptionManagerInterface.hpp" +#include "rpc/Errors.hpp" +#include "util/Assert.hpp" +#include "util/Mutex.hpp" +#include "util/ResponseExpirationCache.hpp" +#include "util/log/Logger.hpp" +#include "util/newconfig/ConfigDefinition.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace etlng { + +/** + * @brief A tag class to help identify LoadBalancer in templated code. + */ +struct LoadBalancerTag { + virtual ~LoadBalancerTag() = default; +}; + +template +concept SomeLoadBalancer = std::derived_from; + +/** + * @brief This class is used to manage connections to transaction processing processes. + * + * This class spawns a listener for each etl source, which listens to messages on the ledgers stream (to keep track of + * which ledgers have been validated by the network, and the range of ledgers each etl source has). This class also + * allows requests for ledger data to be load balanced across all possible ETL sources. + */ +class LoadBalancer : public etlng::LoadBalancerInterface, LoadBalancerTag { +public: + using RawLedgerObjectType = org::xrpl::rpc::v1::RawLedgerObject; + using GetLedgerResponseType = org::xrpl::rpc::v1::GetLedgerResponse; + using OptionalGetLedgerResponseType = std::optional; + +private: + static constexpr std::uint32_t kDEFAULT_DOWNLOAD_RANGES = 16; + + util::Logger log_{"ETL"}; + // Forwarding cache must be destroyed after sources because sources have a callback to invalidate cache + std::optional forwardingCache_; + std::optional forwardingXUserValue_; + + std::vector sources_; + std::optional etlState_; + std::uint32_t downloadRanges_ = + kDEFAULT_DOWNLOAD_RANGES; /*< The number of markers to use when downloading initial ledger */ + + // Using mutext instead of atomic_bool because choosing a new source to + // forward messages should be done with a mutual exclusion otherwise there will be a race condition + util::Mutex hasForwardingSource_{false}; + +public: + /** + * @brief Value for the X-User header when forwarding admin requests + */ + static constexpr std::string_view kADMIN_FORWARDING_X_USER_VALUE = "clio_admin"; + + /** + * @brief Value for the X-User header when forwarding user requests + */ + static constexpr std::string_view kUSER_FORWARDING_X_USER_VALUE = "clio_user"; + + /** + * @brief Create an instance of the load balancer. + * + * @param config The configuration to use + * @param ioc The io_context to run on + * @param backend BackendInterface implementation + * @param subscriptions Subscription manager + * @param validatedLedgers The network validated ledgers datastructure + * @param sourceFactory A factory function to create a source + */ + LoadBalancer( + util::config::ClioConfigDefinition const& config, + boost::asio::io_context& ioc, + std::shared_ptr backend, + std::shared_ptr subscriptions, + std::shared_ptr validatedLedgers, + SourceFactory sourceFactory = makeSource + ); + + /** + * @brief A factory function for the load balancer. + * + * @param config The configuration to use + * @param ioc The io_context to run on + * @param backend BackendInterface implementation + * @param subscriptions Subscription manager + * @param validatedLedgers The network validated ledgers datastructure + * @param sourceFactory A factory function to create a source + * @return A shared pointer to a new instance of LoadBalancer + */ + static std::shared_ptr + makeLoadBalancer( + util::config::ClioConfigDefinition const& config, + boost::asio::io_context& ioc, + std::shared_ptr backend, + std::shared_ptr subscriptions, + std::shared_ptr validatedLedgers, + SourceFactory sourceFactory = makeSource + ); + + ~LoadBalancer() override; + + /** + * @brief Load the initial ledger, writing data to the queue. + * @note This function will retry indefinitely until the ledger is downloaded. + * + * @param sequence Sequence of ledger to download + * @param retryAfter Time to wait between retries (2 seconds by default) + * @return A std::vector The ledger data + */ + std::vector + loadInitialLedger( + [[maybe_unused]] uint32_t sequence, + [[maybe_unused]] std::chrono::steady_clock::duration retryAfter + ) override + { + ASSERT(false, "Not available for new ETL"); + std::unreachable(); + }; + + /** + * @brief Load the initial ledger, writing data to the queue. + * @note This function will retry indefinitely until the ledger is downloaded. + * + * @param sequence Sequence of ledger to download + * @param observer The observer to notify of progress + * @param retryAfter Time to wait between retries (2 seconds by default) + * @return A std::vector The ledger data + */ + std::vector + loadInitialLedger( + uint32_t sequence, + etlng::InitialLoadObserverInterface& observer, + std::chrono::steady_clock::duration retryAfter + ) override; + + /** + * @brief Fetch data for a specific ledger. + * + * This function will continuously try to fetch data for the specified ledger until the fetch succeeds, the ledger + * is found in the database, or the server is shutting down. + * + * @param ledgerSequence Sequence of the ledger to fetch + * @param getObjects Whether to get the account state diff between this ledger and the prior one + * @param getObjectNeighbors Whether to request object neighbors + * @param retryAfter Time to wait between retries (2 seconds by default) + * @return The extracted data, if extraction was successful. If the ledger was found + * in the database or the server is shutting down, the optional will be empty + */ + OptionalGetLedgerResponseType + fetchLedger( + uint32_t ledgerSequence, + bool getObjects, + bool getObjectNeighbors, + std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2} + ) override; + + /** + * @brief Represent the state of this load balancer as a JSON object + * + * @return JSON representation of the state of this load balancer. + */ + boost::json::value + toJson() const override; + + /** + * @brief Forward a JSON RPC request to a randomly selected rippled node. + * + * @param request JSON-RPC request to forward + * @param clientIp The IP address of the peer, if known + * @param isAdmin Whether the request is from an admin + * @param yield The coroutine context + * @return Response received from rippled node as JSON object on success or error on failure + */ + std::expected + forwardToRippled( + boost::json::object const& request, + std::optional const& clientIp, + bool isAdmin, + boost::asio::yield_context yield + ) override; + + /** + * @brief Return state of ETL nodes. + * @return ETL state, nullopt if etl nodes not available + */ + std::optional + getETLState() noexcept override; + + /** + * @brief Stop the load balancer. This will stop all subscription sources. + * @note This function will asynchronously wait for all sources to stop. + * + * @param yield The coroutine context + */ + void + stop(boost::asio::yield_context yield) override; + +private: + /** + * @brief Execute a function on a randomly selected source. + * + * @note f is a function that takes an Source as an argument and returns a bool. + * Attempt to execute f for one randomly chosen Source that has the specified ledger. If f returns false, another + * randomly chosen Source is used. The process repeats until f returns true. + * + * @param f Function to execute. This function takes the ETL source as an argument, and returns a bool + * @param ledgerSequence f is executed for each Source that has this ledger + * @param retryAfter Time to wait between retries (2 seconds by default) + * server is shutting down + */ + template + void + execute(Func f, uint32_t ledgerSequence, std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2}); + + /** + * @brief Choose a new source to forward requests + */ + void + chooseForwardingSource(); +}; + +} // namespace etlng diff --git a/src/etlng/LoadBalancerInterface.hpp b/src/etlng/LoadBalancerInterface.hpp index 3466a6e79..20623d456 100644 --- a/src/etlng/LoadBalancerInterface.hpp +++ b/src/etlng/LoadBalancerInterface.hpp @@ -129,6 +129,15 @@ public: */ virtual std::optional getETLState() noexcept = 0; + + /** + * @brief Stop the load balancer. This will stop all subscription sources. + * @note This function will asynchronously wait for all sources to stop. + * + * @param yield The coroutine context + */ + virtual void + stop(boost::asio::yield_context yield) = 0; }; } // namespace etlng diff --git a/src/etlng/LoaderInterface.hpp b/src/etlng/LoaderInterface.hpp index 929d71679..72cad3192 100644 --- a/src/etlng/LoaderInterface.hpp +++ b/src/etlng/LoaderInterface.hpp @@ -45,7 +45,7 @@ struct LoaderInterface { * @param data The data to load * @return Optional ledger header */ - virtual std::optional + [[nodiscard]] virtual std::optional loadInitialLedger(model::LedgerData const& data) = 0; }; diff --git a/src/etlng/Source.cpp b/src/etlng/Source.cpp new file mode 100644 index 000000000..3f9395446 --- /dev/null +++ b/src/etlng/Source.cpp @@ -0,0 +1,73 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2024, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "etlng/Source.hpp" + +#include "etl/NetworkValidatedLedgersInterface.hpp" +#include "etl/impl/ForwardingSource.hpp" +#include "etl/impl/SubscriptionSource.hpp" +#include "etlng/impl/GrpcSource.hpp" +#include "etlng/impl/SourceImpl.hpp" +#include "feed/SubscriptionManagerInterface.hpp" +#include "util/newconfig/ObjectView.hpp" + +#include + +#include +#include +#include +#include + +namespace etlng { + +SourcePtr +makeSource( + util::config::ObjectView const& config, + boost::asio::io_context& ioc, + std::shared_ptr subscriptions, + std::shared_ptr validatedLedgers, + std::chrono::steady_clock::duration forwardingTimeout, + SourceBase::OnConnectHook onConnect, + SourceBase::OnDisconnectHook onDisconnect, + SourceBase::OnLedgerClosedHook onLedgerClosed +) +{ + auto const ip = config.get("ip"); + auto const wsPort = config.get("ws_port"); + auto const grpcPort = config.get("grpc_port"); + + etl::impl::ForwardingSource forwardingSource{ip, wsPort, forwardingTimeout}; + impl::GrpcSource grpcSource{ip, grpcPort}; + auto subscriptionSource = std::make_unique( + ioc, + ip, + wsPort, + std::move(validatedLedgers), + std::move(subscriptions), + std::move(onConnect), + std::move(onDisconnect), + std::move(onLedgerClosed) + ); + + return std::make_unique>( + ip, wsPort, grpcPort, std::move(grpcSource), std::move(subscriptionSource), std::move(forwardingSource) + ); +} + +} // namespace etlng diff --git a/src/etlng/Source.hpp b/src/etlng/Source.hpp new file mode 100644 index 000000000..52c07ef9b --- /dev/null +++ b/src/etlng/Source.hpp @@ -0,0 +1,194 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2024, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "data/BackendInterface.hpp" +#include "etl/NetworkValidatedLedgersInterface.hpp" +#include "etlng/InitialLoadObserverInterface.hpp" +#include "feed/SubscriptionManagerInterface.hpp" +#include "rpc/Errors.hpp" +#include "util/newconfig/ObjectView.hpp" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace etlng { + +/** + * @brief Provides an implementation of a ETL source + */ +class SourceBase { +public: + using OnConnectHook = std::function; + using OnDisconnectHook = std::function; + using OnLedgerClosedHook = std::function; + + virtual ~SourceBase() = default; + + /** + * @brief Run subscriptions loop of the source + */ + virtual void + run() = 0; + + /** + * @brief Stop Source. + * @note This method will asynchronously wait for source to be stopped. + * + * @param yield The coroutine context. + */ + virtual void + stop(boost::asio::yield_context yield) = 0; + + /** + * @brief Check if source is connected + * + * @return true if source is connected; false otherwise + */ + [[nodiscard]] virtual bool + isConnected() const = 0; + + /** + * @brief Set the forwarding state of the source. + * + * @param isForwarding Whether to forward or not + */ + virtual void + setForwarding(bool isForwarding) = 0; + + /** + * @brief Represent the source as a JSON object + * + * @return JSON representation of the source + */ + [[nodiscard]] virtual boost::json::object + toJson() const = 0; + + /** @return String representation of the source (for debug) */ + [[nodiscard]] virtual std::string + toString() const = 0; + + /** + * @brief Check if ledger is known by this source. + * + * @param sequence The ledger sequence to check + * @return true if ledger is in the range of this source; false otherwise + */ + [[nodiscard]] virtual bool + hasLedger(uint32_t sequence) const = 0; + + /** + * @brief Fetch data for a specific ledger. + * + * This function will continuously try to fetch data for the specified ledger until the fetch succeeds, the ledger + * is found in the database, or the server is shutting down. + * + * @param sequence Sequence of the ledger to fetch + * @param getObjects Whether to get the account state diff between this ledger and the prior one; defaults to true + * @param getObjectNeighbors Whether to request object neighbors; defaults to false + * @return A std::pair of the response status and the response itself + */ + [[nodiscard]] virtual std::pair + fetchLedger(uint32_t sequence, bool getObjects = true, bool getObjectNeighbors = false) = 0; + + /** + * @brief Download a ledger in full. + * + * @param sequence Sequence of the ledger to download + * @param numMarkers Number of markers to generate for async calls + * @param loader InitialLoadObserverInterface implementation + * @return A std::pair of the data and a bool indicating whether the download was successful + */ + virtual std::pair, bool> + loadInitialLedger(uint32_t sequence, std::uint32_t numMarkers, etlng::InitialLoadObserverInterface& loader) = 0; + + /** + * @brief Forward a request to rippled. + * + * @param request The request to forward + * @param forwardToRippledClientIp IP of the client forwarding this request if known + * @param xUserValue Value of the X-User header + * @param yield The coroutine context + * @return Response on success or error on failure + */ + [[nodiscard]] virtual std::expected + forwardToRippled( + boost::json::object const& request, + std::optional const& forwardToRippledClientIp, + std::string_view xUserValue, + boost::asio::yield_context yield + ) const = 0; +}; + +using SourcePtr = std::unique_ptr; + +using SourceFactory = std::function subscriptions, + std::shared_ptr validatedLedgers, + std::chrono::steady_clock::duration forwardingTimeout, + SourceBase::OnConnectHook onConnect, + SourceBase::OnDisconnectHook onDisconnect, + SourceBase::OnLedgerClosedHook onLedgerClosed +)>; + +/** + * @brief Create a source + * + * @param config The configuration to use + * @param ioc The io_context to run on + * @param subscriptions Subscription manager + * @param validatedLedgers The network validated ledgers data structure + * @param forwardingTimeout The timeout for forwarding to rippled + * @param onConnect The hook to call on connect + * @param onDisconnect The hook to call on disconnect + * @param onLedgerClosed The hook to call on ledger closed. This is called when a ledger is closed and the source is set + * as forwarding. + * @return The created source + */ +[[nodiscard]] SourcePtr +makeSource( + util::config::ObjectView const& config, + boost::asio::io_context& ioc, + std::shared_ptr subscriptions, + std::shared_ptr validatedLedgers, + std::chrono::steady_clock::duration forwardingTimeout, + SourceBase::OnConnectHook onConnect, + SourceBase::OnDisconnectHook onDisconnect, + SourceBase::OnLedgerClosedHook onLedgerClosed +); + +} // namespace etlng diff --git a/src/etlng/impl/ForwardingSource.cpp b/src/etlng/impl/ForwardingSource.cpp new file mode 100644 index 000000000..b4fb024bf --- /dev/null +++ b/src/etlng/impl/ForwardingSource.cpp @@ -0,0 +1,116 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2024, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "etlng/impl/ForwardingSource.hpp" + +#include "rpc/Errors.hpp" +#include "util/log/Logger.hpp" + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace etlng::impl { + +ForwardingSource::ForwardingSource( + std::string ip, + std::string wsPort, + std::chrono::steady_clock::duration forwardingTimeout, + std::chrono::steady_clock::duration connTimeout +) + : log_(fmt::format("ForwardingSource[{}:{}]", ip, wsPort)) + , connectionBuilder_(std::move(ip), std::move(wsPort)) + , forwardingTimeout_{forwardingTimeout} +{ + connectionBuilder_.setConnectionTimeout(connTimeout) + .addHeader( + {boost::beast::http::field::user_agent, fmt::format("{} websocket-client-coro", BOOST_BEAST_VERSION_STRING)} + ); +} + +std::expected +ForwardingSource::forwardToRippled( + boost::json::object const& request, + std::optional const& forwardToRippledClientIp, + std::string_view xUserValue, + boost::asio::yield_context yield +) const +{ + auto connectionBuilder = connectionBuilder_; + if (forwardToRippledClientIp) { + connectionBuilder.addHeader( + {boost::beast::http::field::forwarded, fmt::format("for={}", *forwardToRippledClientIp)} + ); + } + + connectionBuilder.addHeader({"X-User", std::string{xUserValue}}); + + auto expectedConnection = connectionBuilder.connect(yield); + if (not expectedConnection) { + LOG(log_.debug()) << "Couldn't connect to rippled to forward request."; + return std::unexpected{rpc::ClioError::EtlConnectionError}; + } + auto& connection = expectedConnection.value(); + + auto writeError = connection->write(boost::json::serialize(request), yield, forwardingTimeout_); + if (writeError) { + LOG(log_.debug()) << "Error sending request to rippled to forward request."; + return std::unexpected{rpc::ClioError::EtlRequestError}; + } + + auto response = connection->read(yield, forwardingTimeout_); + if (not response) { + if (auto errorCode = response.error().errorCode(); + errorCode.has_value() and errorCode->value() == boost::system::errc::timed_out) { + LOG(log_.debug()) << "Request to rippled timed out"; + return std::unexpected{rpc::ClioError::EtlRequestTimeout}; + } + LOG(log_.debug()) << "Error sending request to rippled to forward request."; + return std::unexpected{rpc::ClioError::EtlRequestError}; + } + + boost::json::value parsedResponse; + try { + parsedResponse = boost::json::parse(*response); + if (not parsedResponse.is_object()) + throw std::runtime_error("response is not an object"); + } catch (std::exception const& e) { + LOG(log_.debug()) << "Error parsing response from rippled: " << e.what() << ". Response: " << *response; + return std::unexpected{rpc::ClioError::EtlInvalidResponse}; + } + + auto responseObject = parsedResponse.as_object(); + responseObject["forwarded"] = true; + + return responseObject; +} + +} // namespace etlng::impl diff --git a/src/etlng/impl/ForwardingSource.hpp b/src/etlng/impl/ForwardingSource.hpp new file mode 100644 index 000000000..8624e4e50 --- /dev/null +++ b/src/etlng/impl/ForwardingSource.hpp @@ -0,0 +1,70 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2024, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "rpc/Errors.hpp" +#include "util/log/Logger.hpp" +#include "util/requests/WsConnection.hpp" + +#include +#include + +#include +#include +#include +#include +#include + +namespace etlng::impl { + +class ForwardingSource { + util::Logger log_; + util::requests::WsConnectionBuilder connectionBuilder_; + std::chrono::steady_clock::duration forwardingTimeout_; + + static constexpr std::chrono::seconds kCONNECTION_TIMEOUT{3}; + +public: + ForwardingSource( + std::string ip, + std::string wsPort, + std::chrono::steady_clock::duration forwardingTimeout, + std::chrono::steady_clock::duration connTimeout = ForwardingSource::kCONNECTION_TIMEOUT + ); + + /** + * @brief Forward a request to rippled. + * + * @param request The request to forward + * @param forwardToRippledClientIp IP of the client forwarding this request if known + * @param xUserValue Optional value for X-User header + * @param yield The coroutine context + * @return Response on success or error on failure + */ + std::expected + forwardToRippled( + boost::json::object const& request, + std::optional const& forwardToRippledClientIp, + std::string_view xUserValue, + boost::asio::yield_context yield + ) const; +}; + +} // namespace etlng::impl diff --git a/src/etlng/impl/Loading.hpp b/src/etlng/impl/Loading.hpp index 435dcb4e4..caa677bce 100644 --- a/src/etlng/impl/Loading.hpp +++ b/src/etlng/impl/Loading.hpp @@ -39,7 +39,6 @@ #include #include -#include #include #include #include diff --git a/src/etlng/impl/Registry.hpp b/src/etlng/impl/Registry.hpp index d5d262388..921b70811 100644 --- a/src/etlng/impl/Registry.hpp +++ b/src/etlng/impl/Registry.hpp @@ -24,7 +24,6 @@ #include -#include #include #include #include @@ -81,7 +80,7 @@ concept ContainsValidHook = HasLedgerDataHook or HasInitialDataHook or template concept NoTwoOfKind = not(HasLedgerDataHook and HasTransactionHook) and - not(HasInitialDataHook and HasInitialTransactionHook) and not(HasInitialDataHook and HasObjectHook) and + not(HasInitialDataHook and HasInitialTransactionHook) and not(HasInitialObjectsHook and HasInitialObjectHook); template @@ -216,4 +215,10 @@ public: } }; +static auto +makeRegistry(auto&&... exts) +{ + return std::make_unique...>>(std::forward(exts)...); +} + } // namespace etlng::impl diff --git a/src/etlng/impl/SourceImpl.hpp b/src/etlng/impl/SourceImpl.hpp new file mode 100644 index 000000000..1c99d973b --- /dev/null +++ b/src/etlng/impl/SourceImpl.hpp @@ -0,0 +1,232 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2024, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "etl/impl/ForwardingSource.hpp" +#include "etl/impl/SubscriptionSource.hpp" +#include "etlng/InitialLoadObserverInterface.hpp" +#include "etlng/Source.hpp" +#include "etlng/impl/GrpcSource.hpp" +#include "rpc/Errors.hpp" + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace etlng::impl { + +/** + * @brief Provides an implementation of a ETL source + * + * @tparam GrpcSourceType The type of the gRPC source + * @tparam SubscriptionSourceTypePtr The type of the subscription source + * @tparam ForwardingSourceType The type of the forwarding source + */ +template < + typename GrpcSourceType = GrpcSource, + typename SubscriptionSourceTypePtr = std::unique_ptr, + typename ForwardingSourceType = etl::impl::ForwardingSource> +class SourceImpl : public SourceBase { + std::string ip_; + std::string wsPort_; + std::string grpcPort_; + + GrpcSourceType grpcSource_; + SubscriptionSourceTypePtr subscriptionSource_; + ForwardingSourceType forwardingSource_; + +public: + /** + * @brief Construct a new SourceImpl object + * + * @param ip The IP of the source + * @param wsPort The web socket port of the source + * @param grpcPort The gRPC port of the source + * @param grpcSource The gRPC source + * @param subscriptionSource The subscription source + * @param forwardingSource The forwarding source + */ + template + requires std::is_same_v and + std::is_same_v + SourceImpl( + std::string ip, + std::string wsPort, + std::string grpcPort, + SomeGrpcSourceType&& grpcSource, + SubscriptionSourceTypePtr subscriptionSource, + SomeForwardingSourceType&& forwardingSource + ) + : ip_(std::move(ip)) + , wsPort_(std::move(wsPort)) + , grpcPort_(std::move(grpcPort)) + , grpcSource_(std::forward(grpcSource)) + , subscriptionSource_(std::move(subscriptionSource)) + , forwardingSource_(std::forward(forwardingSource)) + { + } + + /** + * @brief Run subscriptions loop of the source + */ + void + run() final + { + subscriptionSource_->run(); + } + + void + stop(boost::asio::yield_context yield) final + { + subscriptionSource_->stop(yield); + } + + /** + * @brief Check if source is connected + * + * @return true if source is connected; false otherwise + */ + bool + isConnected() const final + { + return subscriptionSource_->isConnected(); + } + + /** + * @brief Set the forwarding state of the source. + * + * @param isForwarding Whether to forward or not + */ + void + setForwarding(bool isForwarding) final + { + subscriptionSource_->setForwarding(isForwarding); + } + + /** + * @brief Represent the source as a JSON object + * + * @return JSON representation of the source + */ + boost::json::object + toJson() const final + { + boost::json::object res; + + res["validated_range"] = subscriptionSource_->validatedRange(); + res["is_connected"] = std::to_string(static_cast(subscriptionSource_->isConnected())); + res["ip"] = ip_; + res["ws_port"] = wsPort_; + res["grpc_port"] = grpcPort_; + + auto last = subscriptionSource_->lastMessageTime(); + if (last.time_since_epoch().count() != 0) { + res["last_msg_age_seconds"] = std::to_string( + std::chrono::duration_cast(std::chrono::steady_clock::now() - last).count() + ); + } + + return res; + } + + /** @return String representation of the source (for debug) */ + std::string + toString() const final + { + return "{validated range: " + subscriptionSource_->validatedRange() + ", ip: " + ip_ + + ", web socket port: " + wsPort_ + ", grpc port: " + grpcPort_ + "}"; + } + + /** + * @brief Check if ledger is known by this source. + * + * @param sequence The ledger sequence to check + * @return true if ledger is in the range of this source; false otherwise + */ + bool + hasLedger(uint32_t sequence) const final + { + return subscriptionSource_->hasLedger(sequence); + } + + /** + * @brief Fetch data for a specific ledger. + * + * This function will continuously try to fetch data for the specified ledger until the fetch succeeds, the ledger + * is found in the database, or the server is shutting down. + * + * @param sequence Sequence of the ledger to fetch + * @param getObjects Whether to get the account state diff between this ledger and the prior one; defaults to true + * @param getObjectNeighbors Whether to request object neighbors; defaults to false + * @return A std::pair of the response status and the response itself + */ + std::pair + fetchLedger(uint32_t sequence, bool getObjects = true, bool getObjectNeighbors = false) final + { + return grpcSource_.fetchLedger(sequence, getObjects, getObjectNeighbors); + } + + /** + * @brief Download a ledger in full. + * + * @param sequence Sequence of the ledger to download + * @param numMarkers Number of markers to generate for async calls + * @param loader InitialLoadObserverInterface implementation + * @return A std::pair of the data and a bool indicating whether the download was successful + */ + std::pair, bool> + loadInitialLedger(uint32_t sequence, std::uint32_t numMarkers, etlng::InitialLoadObserverInterface& loader) final + { + return grpcSource_.loadInitialLedger(sequence, numMarkers, loader); + } + + /** + * @brief Forward a request to rippled. + * + * @param request The request to forward + * @param forwardToRippledClientIp IP of the client forwarding this request if known + * @param xUserValue Optional value of the X-User header + * @param yield The coroutine context + * @return Response or ClioError + */ + std::expected + forwardToRippled( + boost::json::object const& request, + std::optional const& forwardToRippledClientIp, + std::string_view xUserValue, + boost::asio::yield_context yield + ) const final + { + return forwardingSource_.forwardToRippled(request, forwardToRippledClientIp, xUserValue, yield); + } +}; + +} // namespace etlng::impl diff --git a/src/etlng/impl/ext/NFT.hpp b/src/etlng/impl/ext/NFT.hpp index 65e45233c..6d6a146cf 100644 --- a/src/etlng/impl/ext/NFT.hpp +++ b/src/etlng/impl/ext/NFT.hpp @@ -20,15 +20,11 @@ #pragma once #include "data/BackendInterface.hpp" -#include "data/DBHelpers.hpp" -#include "etl/NFTHelpers.hpp" #include "etlng/Models.hpp" #include "util/log/Logger.hpp" #include #include -#include -#include namespace etlng::impl { diff --git a/src/rpc/RPCEngine.hpp b/src/rpc/RPCEngine.hpp index 13216c20d..fe46db104 100644 --- a/src/rpc/RPCEngine.hpp +++ b/src/rpc/RPCEngine.hpp @@ -20,6 +20,7 @@ #pragma once #include "data/BackendInterface.hpp" +#include "etlng/LoadBalancerInterface.hpp" #include "rpc/Errors.hpp" #include "rpc/RPCHelpers.hpp" #include "rpc/WorkQueue.hpp" @@ -55,7 +56,7 @@ namespace rpc { /** * @brief The RPC engine that ties all RPC-related functionality together. */ -template +template class RPCEngine { util::Logger perfLog_{"Performance"}; util::Logger log_{"RPC"}; @@ -67,7 +68,7 @@ class RPCEngine { std::shared_ptr handlerProvider_; - impl::ForwardingProxy forwardingProxy_; + impl::ForwardingProxy forwardingProxy_; std::optional responseCache_; @@ -86,7 +87,7 @@ public: RPCEngine( util::config::ClioConfigDefinition const& config, std::shared_ptr const& backend, - std::shared_ptr const& balancer, + std::shared_ptr const& balancer, web::dosguard::DOSGuardInterface const& dosGuard, WorkQueue& workQueue, CountersType& counters, @@ -128,7 +129,7 @@ public: makeRPCEngine( util::config::ClioConfigDefinition const& config, std::shared_ptr const& backend, - std::shared_ptr const& balancer, + std::shared_ptr const& balancer, web::dosguard::DOSGuardInterface const& dosGuard, WorkQueue& workQueue, CountersType& counters, diff --git a/src/rpc/common/impl/ForwardingProxy.hpp b/src/rpc/common/impl/ForwardingProxy.hpp index c60773df1..2c07b79ab 100644 --- a/src/rpc/common/impl/ForwardingProxy.hpp +++ b/src/rpc/common/impl/ForwardingProxy.hpp @@ -19,6 +19,7 @@ #pragma once +#include "etlng/LoadBalancerInterface.hpp" #include "rpc/Errors.hpp" #include "rpc/RPCHelpers.hpp" #include "rpc/common/Types.hpp" @@ -31,20 +32,21 @@ #include #include #include +#include namespace rpc::impl { -template +template class ForwardingProxy { util::Logger log_{"RPC"}; - std::shared_ptr balancer_; + std::shared_ptr balancer_; std::reference_wrapper counters_; std::shared_ptr handlerProvider_; public: ForwardingProxy( - std::shared_ptr const& balancer, + std::shared_ptr const& balancer, CountersType& counters, std::shared_ptr const& handlerProvider ) diff --git a/src/rpc/common/impl/HandlerProvider.cpp b/src/rpc/common/impl/HandlerProvider.cpp index cf4485cc2..6707cca07 100644 --- a/src/rpc/common/impl/HandlerProvider.cpp +++ b/src/rpc/common/impl/HandlerProvider.cpp @@ -21,7 +21,8 @@ #include "data/AmendmentCenterInterface.hpp" #include "data/BackendInterface.hpp" -#include "etl/ETLService.hpp" +#include "etlng/ETLServiceInterface.hpp" +#include "etlng/LoadBalancerInterface.hpp" #include "feed/SubscriptionManagerInterface.hpp" #include "rpc/Counters.hpp" #include "rpc/common/AnyHandler.hpp" @@ -72,8 +73,8 @@ ProductionHandlerProvider::ProductionHandlerProvider( util::config::ClioConfigDefinition const& config, std::shared_ptr const& backend, std::shared_ptr const& subscriptionManager, - std::shared_ptr const& balancer, - std::shared_ptr const& etl, + std::shared_ptr const& balancer, + std::shared_ptr const& etl, std::shared_ptr const& amendmentCenter, Counters const& counters ) diff --git a/src/rpc/common/impl/HandlerProvider.hpp b/src/rpc/common/impl/HandlerProvider.hpp index 89ea5661f..2c07428cf 100644 --- a/src/rpc/common/impl/HandlerProvider.hpp +++ b/src/rpc/common/impl/HandlerProvider.hpp @@ -21,6 +21,8 @@ #include "data/AmendmentCenterInterface.hpp" #include "data/BackendInterface.hpp" +#include "etlng/ETLServiceInterface.hpp" +#include "etlng/LoadBalancerInterface.hpp" #include "feed/SubscriptionManagerInterface.hpp" #include "rpc/common/AnyHandler.hpp" #include "rpc/common/HandlerProvider.hpp" @@ -32,10 +34,6 @@ #include #include -namespace etl { -class ETLService; -class LoadBalancer; -} // namespace etl namespace rpc { class Counters; } // namespace rpc @@ -55,8 +53,8 @@ public: util::config::ClioConfigDefinition const& config, std::shared_ptr const& backend, std::shared_ptr const& subscriptionManager, - std::shared_ptr const& balancer, - std::shared_ptr const& etl, + std::shared_ptr const& balancer, + std::shared_ptr const& etl, std::shared_ptr const& amendmentCenter, Counters const& counters ); diff --git a/src/rpc/handlers/ServerInfo.hpp b/src/rpc/handlers/ServerInfo.hpp index 008450721..085066f7c 100644 --- a/src/rpc/handlers/ServerInfo.hpp +++ b/src/rpc/handlers/ServerInfo.hpp @@ -21,6 +21,8 @@ #include "data/BackendInterface.hpp" #include "data/DBHelpers.hpp" +#include "etlng/ETLServiceInterface.hpp" +#include "etlng/LoadBalancerInterface.hpp" #include "feed/SubscriptionManagerInterface.hpp" #include "rpc/Errors.hpp" #include "rpc/JS.hpp" @@ -49,10 +51,6 @@ #include #include -namespace etl { -class ETLService; -class LoadBalancer; -} // namespace etl namespace rpc { class Counters; } // namespace rpc @@ -62,18 +60,16 @@ namespace rpc { /** * @brief Contains common functionality for handling the `server_info` command * - * @tparam LoadBalancerType The type of the load balancer - * @tparam ETLServiceType The type of the ETL service * @tparam CountersType The type of the counters */ -template +template class BaseServerInfoHandler { static constexpr auto kBACKEND_COUNTERS_KEY = "backend_counters"; std::shared_ptr backend_; std::shared_ptr subscriptions_; - std::shared_ptr balancer_; - std::shared_ptr etl_; + std::shared_ptr balancer_; + std::shared_ptr etl_; std::reference_wrapper counters_; public: @@ -158,8 +154,8 @@ public: BaseServerInfoHandler( std::shared_ptr const& backend, std::shared_ptr const& subscriptions, - std::shared_ptr const& balancer, - std::shared_ptr const& etl, + std::shared_ptr const& balancer, + std::shared_ptr const& etl, CountersType const& counters ) : backend_(backend) @@ -352,6 +348,6 @@ private: * * For more details see: https://xrpl.org/server_info-clio.html */ -using ServerInfoHandler = BaseServerInfoHandler; +using ServerInfoHandler = BaseServerInfoHandler; } // namespace rpc diff --git a/src/rpc/handlers/Tx.hpp b/src/rpc/handlers/Tx.hpp index b05a64c1e..acc5de4a3 100644 --- a/src/rpc/handlers/Tx.hpp +++ b/src/rpc/handlers/Tx.hpp @@ -22,6 +22,7 @@ #include "data/BackendInterface.hpp" #include "data/Types.hpp" #include "etl/ETLService.hpp" +#include "etlng/ETLServiceInterface.hpp" #include "rpc/Errors.hpp" #include "rpc/JS.hpp" #include "rpc/RPCHelpers.hpp" @@ -52,14 +53,13 @@ namespace rpc { /** - * @brief Contains common functionality for handling the `tx` command + * @brief The tx method retrieves information on a single transaction, by its identifying hash. * - * @tparam ETLServiceType The type of the ETL service to use + * For more details see: https://xrpl.org/tx.html */ -template -class BaseTxHandler { +class TxHandler { std::shared_ptr sharedPtrBackend_; - std::shared_ptr etl_; + std::shared_ptr etl_; public: /** @@ -95,14 +95,14 @@ public: using Result = HandlerReturnType; /** - * @brief Construct a new BaseTxHandler object + * @brief Construct a new TxHandler object * * @param sharedPtrBackend The backend to use * @param etl The ETL service to use */ - BaseTxHandler( + TxHandler( std::shared_ptr const& sharedPtrBackend, - std::shared_ptr const& etl + std::shared_ptr const& etl ) : sharedPtrBackend_(sharedPtrBackend), etl_(etl) { @@ -183,7 +183,7 @@ public: dbResponse = sharedPtrBackend_->fetchTransaction(ripple::uint256{input.transaction->c_str()}, ctx.yield); } - auto output = BaseTxHandler::Output{.apiVersion = ctx.apiVersion}; + auto output = TxHandler::Output{.apiVersion = ctx.apiVersion}; if (!dbResponse) { if (rangeSupplied && input.transaction) // ranges not for ctid @@ -320,7 +320,7 @@ private: friend Input tag_invoke(boost::json::value_to_tag, boost::json::value const& jv) { - auto input = BaseTxHandler::Input{}; + auto input = TxHandler::Input{}; auto const& jsonObject = jv.as_object(); if (jsonObject.contains(JS(transaction))) @@ -344,10 +344,4 @@ private: } }; -/** - * @brief The tx method retrieves information on a single transaction, by its identifying hash. - * - * For more details see: https://xrpl.org/tx.html - */ -using TxHandler = BaseTxHandler; } // namespace rpc diff --git a/src/util/newconfig/ConfigDefinition.hpp b/src/util/newconfig/ConfigDefinition.hpp index 030b8d160..b00cbd1cb 100644 --- a/src/util/newconfig/ConfigDefinition.hpp +++ b/src/util/newconfig/ConfigDefinition.hpp @@ -293,7 +293,7 @@ static ClioConfigDefinition gClioConfig = ClioConfigDefinition{ {"database.cassandra.certfile", ConfigValue{ConfigType::String}.optional()}, {"allow_no_etl", ConfigValue{ConfigType::Boolean}.defaultValue(false)}, - + {"__ng_etl", ConfigValue{ConfigType::Boolean}.defaultValue(false)}, {"etl_sources.[].ip", Array{ConfigValue{ConfigType::String}.optional().withConstraint(gValidateIp)}}, {"etl_sources.[].ws_port", Array{ConfigValue{ConfigType::String}.optional().withConstraint(gValidatePort)}}, {"etl_sources.[].grpc_port", Array{ConfigValue{ConfigType::String}.optional().withConstraint(gValidatePort)}}, diff --git a/src/web/RPCServerHandler.hpp b/src/web/RPCServerHandler.hpp index 632d9551e..a0abacb7c 100644 --- a/src/web/RPCServerHandler.hpp +++ b/src/web/RPCServerHandler.hpp @@ -20,6 +20,7 @@ #pragma once #include "data/BackendInterface.hpp" +#include "etlng/ETLServiceInterface.hpp" #include "rpc/Errors.hpp" #include "rpc/Factories.hpp" #include "rpc/JS.hpp" @@ -58,11 +59,11 @@ namespace web { * * Note: see @ref web::SomeServerHandler concept */ -template +template class RPCServerHandler { std::shared_ptr const backend_; std::shared_ptr const rpcEngine_; - std::shared_ptr const etl_; + std::shared_ptr const etl_; util::TagDecoratorFactory const tagFactory_; rpc::impl::ProductionAPIVersionParser apiVersionParser_; // can be injected if needed @@ -82,7 +83,7 @@ public: util::config::ClioConfigDefinition const& config, std::shared_ptr const& backend, std::shared_ptr const& rpcEngine, - std::shared_ptr const& etl + std::shared_ptr const& etl ) : backend_(backend) , rpcEngine_(rpcEngine) diff --git a/src/web/ng/RPCServerHandler.hpp b/src/web/ng/RPCServerHandler.hpp index f8dbcb183..fec8a22f7 100644 --- a/src/web/ng/RPCServerHandler.hpp +++ b/src/web/ng/RPCServerHandler.hpp @@ -20,6 +20,7 @@ #pragma once #include "data/BackendInterface.hpp" +#include "etlng/ETLServiceInterface.hpp" #include "rpc/Errors.hpp" #include "rpc/Factories.hpp" #include "rpc/JS.hpp" @@ -64,11 +65,11 @@ namespace web::ng { * * Note: see @ref web::SomeServerHandler concept */ -template +template class RPCServerHandler { std::shared_ptr const backend_; std::shared_ptr const rpcEngine_; - std::shared_ptr const etl_; + std::shared_ptr const etl_; util::TagDecoratorFactory const tagFactory_; rpc::impl::ProductionAPIVersionParser apiVersionParser_; // can be injected if needed @@ -88,7 +89,7 @@ public: util::config::ClioConfigDefinition const& config, std::shared_ptr const& backend, std::shared_ptr const& rpcEngine, - std::shared_ptr const& etl + std::shared_ptr const& etl ) : backend_(backend) , rpcEngine_(rpcEngine) diff --git a/tests/common/util/MockETLService.hpp b/tests/common/util/MockETLService.hpp index 618e5ab57..f398ad9cd 100644 --- a/tests/common/util/MockETLService.hpp +++ b/tests/common/util/MockETLService.hpp @@ -20,21 +20,21 @@ #pragma once #include "etl/ETLState.hpp" +#include "etlng/ETLServiceInterface.hpp" #include #include #include -#include #include #include -struct MockETLService { - MOCK_METHOD(boost::json::object, getInfo, (), (const)); - MOCK_METHOD(std::chrono::time_point, getLastPublish, (), (const)); - MOCK_METHOD(std::uint32_t, lastPublishAgeSeconds, (), (const)); - MOCK_METHOD(std::uint32_t, lastCloseAgeSeconds, (), (const)); - MOCK_METHOD(bool, isAmendmentBlocked, (), (const)); - MOCK_METHOD(bool, isCorruptionDetected, (), (const)); - MOCK_METHOD(std::optional, getETLState, (), (const)); +struct MockETLService : etlng::ETLServiceInterface { + MOCK_METHOD(void, run, (), (override)); + MOCK_METHOD(void, stop, (), (override)); + MOCK_METHOD(boost::json::object, getInfo, (), (const, override)); + MOCK_METHOD(std::uint32_t, lastCloseAgeSeconds, (), (const, override)); + MOCK_METHOD(bool, isAmendmentBlocked, (), (const, override)); + MOCK_METHOD(bool, isCorruptionDetected, (), (const, override)); + MOCK_METHOD(std::optional, getETLState, (), (const, override)); }; diff --git a/tests/common/util/MockLoadBalancer.hpp b/tests/common/util/MockLoadBalancer.hpp index 12bcfea78..f0b9dd313 100644 --- a/tests/common/util/MockLoadBalancer.hpp +++ b/tests/common/util/MockLoadBalancer.hpp @@ -38,22 +38,6 @@ #include #include -struct MockLoadBalancer { - using RawLedgerObjectType = FakeLedgerObject; - - MOCK_METHOD(void, loadInitialLedger, (std::uint32_t, bool), ()); - MOCK_METHOD(std::optional, fetchLedger, (uint32_t, bool, bool), ()); - MOCK_METHOD(boost::json::value, toJson, (), (const)); - - using ForwardToRippledReturnType = std::expected; - MOCK_METHOD( - ForwardToRippledReturnType, - forwardToRippled, - (boost::json::object const&, std::optional const&, bool, boost::asio::yield_context), - (const) - ); -}; - struct MockNgLoadBalancer : etlng::LoadBalancerInterface { using RawLedgerObjectType = FakeLedgerObject; @@ -85,4 +69,7 @@ struct MockNgLoadBalancer : etlng::LoadBalancerInterface { (boost::json::object const&, std::optional const&, bool, boost::asio::yield_context), (override) ); + MOCK_METHOD(void, stop, (boost::asio::yield_context), ()); }; + +using MockLoadBalancer = MockNgLoadBalancer; diff --git a/tests/common/util/MockSource.hpp b/tests/common/util/MockSource.hpp index 58984d485..a982f3aef 100644 --- a/tests/common/util/MockSource.hpp +++ b/tests/common/util/MockSource.hpp @@ -60,7 +60,7 @@ struct MockSource : etl::SourceBase { (uint32_t, bool, bool), (override) ); - MOCK_METHOD((std::pair, bool>), loadInitialLedger, (uint32_t, uint32_t, bool), (override)); + MOCK_METHOD((std::pair, bool>), loadInitialLedger, (uint32_t, uint32_t), (override)); using ForwardToRippledReturnType = std::expected; MOCK_METHOD( @@ -132,9 +132,9 @@ public: } std::pair, bool> - loadInitialLedger(uint32_t sequence, uint32_t maxLedger, bool getObjects) override + loadInitialLedger(uint32_t sequence, uint32_t maxLedger) override { - return mock_->loadInitialLedger(sequence, maxLedger, getObjects); + return mock_->loadInitialLedger(sequence, maxLedger); } std::expected diff --git a/tests/common/util/MockSourceNg.hpp b/tests/common/util/MockSourceNg.hpp new file mode 100644 index 000000000..7137e47ec --- /dev/null +++ b/tests/common/util/MockSourceNg.hpp @@ -0,0 +1,245 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2023, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== +#pragma once + +#include "etl/NetworkValidatedLedgersInterface.hpp" +#include "etlng/InitialLoadObserverInterface.hpp" +#include "etlng/Source.hpp" +#include "feed/SubscriptionManagerInterface.hpp" +#include "rpc/Errors.hpp" +#include "util/newconfig/ObjectView.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +struct MockSourceNg : etlng::SourceBase { + MOCK_METHOD(void, run, (), (override)); + MOCK_METHOD(void, stop, (boost::asio::yield_context), (override)); + MOCK_METHOD(bool, isConnected, (), (const, override)); + MOCK_METHOD(void, setForwarding, (bool), (override)); + MOCK_METHOD(boost::json::object, toJson, (), (const, override)); + MOCK_METHOD(std::string, toString, (), (const, override)); + MOCK_METHOD(bool, hasLedger, (uint32_t), (const, override)); + MOCK_METHOD( + (std::pair), + fetchLedger, + (uint32_t, bool, bool), + (override) + ); + MOCK_METHOD( + (std::pair, bool>), + loadInitialLedger, + (uint32_t, uint32_t, etlng::InitialLoadObserverInterface&), + (override) + ); + + using ForwardToRippledReturnType = std::expected; + MOCK_METHOD( + ForwardToRippledReturnType, + forwardToRippled, + (boost::json::object const&, std::optional const&, std::string_view, boost::asio::yield_context), + (const, override) + ); +}; + +template