From 58a1833cf25cdfd6188fb0f1f5d3b6638d0be9d6 Mon Sep 17 00:00:00 2001 From: Sergey Kuznetsov Date: Tue, 5 Mar 2024 18:09:29 +0000 Subject: [PATCH] Add forwarding cache (#1204) Fixes #51. --- .gitignore | 1 + CMakeLists.txt | 68 +++++++++++ docs/configure-clio.md | 14 +++ docs/examples/config/example-config.json | 1 + src/etl/ETLState.hpp | 2 +- src/etl/LoadBalancer.cpp | 28 ++++- src/etl/LoadBalancer.hpp | 7 +- src/etl/Source.cpp | 10 +- src/etl/Source.hpp | 6 +- src/etl/impl/ForwardingCache.cpp | 134 ++++++++++++++++++++++ src/etl/impl/ForwardingCache.hpp | 125 ++++++++++++++++++++ src/etl/impl/ForwardingSource.cpp | 1 + src/etl/impl/ForwardingSource.hpp | 1 + src/etl/impl/SubscriptionSource.cpp | 2 + src/etl/impl/SubscriptionSource.hpp | 7 ++ src/util/Mutex.hpp | 29 +++-- unittests/etl/ETLStateTests.cpp | 2 +- unittests/etl/ForwardingCacheTests.cpp | 133 +++++++++++++++++++++ unittests/etl/ForwardingSourceTests.cpp | 29 ++++- unittests/etl/SourceTests.cpp | 3 +- unittests/etl/SubscriptionSourceTests.cpp | 31 ++++- 21 files changed, 605 insertions(+), 29 deletions(-) create mode 100644 src/etl/impl/ForwardingCache.cpp create mode 100644 src/etl/impl/ForwardingCache.hpp create mode 100644 unittests/etl/ForwardingCacheTests.cpp diff --git a/.gitignore b/.gitignore index 6a77ab42..ac2a7062 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ .cache .vscode .python-version +.DS_Store CMakeUserPresets.json config.json src/main/impl/Build.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 1afa081a..c84cd180 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -102,6 +102,7 @@ target_sources( src/etl/LoadBalancer.cpp src/etl/CacheLoaderSettings.cpp src/etl/Source.cpp + src/etl/impl/ForwardingCache.cpp src/etl/impl/ForwardingSource.cpp src/etl/impl/GrpcSource.cpp src/etl/impl/SubscriptionSource.cpp @@ -191,28 +192,51 @@ if (tests) # Common unittests/ConfigTests.cpp unittests/data/BackendCountersTests.cpp + unittests/data/BackendCountersTests.cpp + unittests/data/BackendFactoryTests.cpp unittests/data/BackendFactoryTests.cpp unittests/data/cassandra/AsyncExecutorTests.cpp # Webserver + unittests/data/cassandra/AsyncExecutorTests.cpp + # Webserver + unittests/data/cassandra/BackendTests.cpp unittests/data/cassandra/BackendTests.cpp unittests/data/cassandra/BaseTests.cpp + unittests/data/cassandra/BaseTests.cpp + unittests/data/cassandra/ExecutionStrategyTests.cpp unittests/data/cassandra/ExecutionStrategyTests.cpp unittests/data/cassandra/RetryPolicyTests.cpp + unittests/data/cassandra/RetryPolicyTests.cpp + unittests/data/cassandra/SettingsProviderTests.cpp unittests/data/cassandra/SettingsProviderTests.cpp unittests/DOSGuardTests.cpp unittests/etl/AmendmentBlockHandlerTests.cpp + unittests/etl/AmendmentBlockHandlerTests.cpp unittests/etl/CacheLoaderSettingsTests.cpp unittests/etl/CacheLoaderTests.cpp + unittests/etl/CacheLoaderTests.cpp unittests/etl/CursorProviderTests.cpp unittests/etl/ETLStateTests.cpp + unittests/etl/ETLStateTests.cpp + unittests/etl/ExtractionDataPipeTests.cpp unittests/etl/ExtractionDataPipeTests.cpp unittests/etl/ExtractorTests.cpp + unittests/etl/ExtractorTests.cpp + unittests/etl/ForwardingCacheTests.cpp + unittests/etl/ForwardingSourceTests.cpp unittests/etl/ForwardingSourceTests.cpp unittests/etl/GrpcSourceTests.cpp + unittests/etl/GrpcSourceTests.cpp + unittests/etl/LedgerPublisherTests.cpp unittests/etl/LedgerPublisherTests.cpp unittests/etl/SourceTests.cpp + unittests/etl/SourceTests.cpp + unittests/etl/SubscriptionSourceDependenciesTests.cpp unittests/etl/SubscriptionSourceDependenciesTests.cpp unittests/etl/SubscriptionSourceTests.cpp + unittests/etl/SubscriptionSourceTests.cpp + unittests/etl/TransformerTests.cpp + # RPC unittests/etl/TransformerTests.cpp # RPC unittests/feed/BookChangesFeedTests.cpp @@ -229,48 +253,92 @@ if (tests) unittests/Playground.cpp unittests/ProfilerTests.cpp unittests/rpc/AmendmentsTests.cpp + unittests/rpc/AmendmentsTests.cpp + unittests/rpc/APIVersionTests.cpp unittests/rpc/APIVersionTests.cpp unittests/rpc/BaseTests.cpp + unittests/rpc/BaseTests.cpp + unittests/rpc/CountersTests.cpp unittests/rpc/CountersTests.cpp unittests/rpc/ErrorTests.cpp + unittests/rpc/ErrorTests.cpp + unittests/rpc/ForwardingProxyTests.cpp unittests/rpc/ForwardingProxyTests.cpp unittests/rpc/handlers/AccountChannelsTests.cpp + unittests/rpc/handlers/AccountChannelsTests.cpp + unittests/rpc/handlers/AccountCurrenciesTests.cpp unittests/rpc/handlers/AccountCurrenciesTests.cpp unittests/rpc/handlers/AccountInfoTests.cpp + unittests/rpc/handlers/AccountInfoTests.cpp + unittests/rpc/handlers/AccountLinesTests.cpp unittests/rpc/handlers/AccountLinesTests.cpp unittests/rpc/handlers/AccountNFTsTests.cpp + unittests/rpc/handlers/AccountNFTsTests.cpp + unittests/rpc/handlers/AccountObjectsTests.cpp unittests/rpc/handlers/AccountObjectsTests.cpp unittests/rpc/handlers/AccountOffersTests.cpp + unittests/rpc/handlers/AccountOffersTests.cpp + unittests/rpc/handlers/AccountTxTests.cpp unittests/rpc/handlers/AccountTxTests.cpp unittests/rpc/handlers/AMMInfoTests.cpp # Backend + unittests/rpc/handlers/AMMInfoTests.cpp + # Backend + unittests/rpc/handlers/BookChangesTests.cpp unittests/rpc/handlers/BookChangesTests.cpp unittests/rpc/handlers/BookOffersTests.cpp + unittests/rpc/handlers/BookOffersTests.cpp + unittests/rpc/handlers/DefaultProcessorTests.cpp unittests/rpc/handlers/DefaultProcessorTests.cpp unittests/rpc/handlers/DepositAuthorizedTests.cpp + unittests/rpc/handlers/DepositAuthorizedTests.cpp + unittests/rpc/handlers/GatewayBalancesTests.cpp unittests/rpc/handlers/GatewayBalancesTests.cpp unittests/rpc/handlers/LedgerDataTests.cpp + unittests/rpc/handlers/LedgerDataTests.cpp + unittests/rpc/handlers/LedgerEntryTests.cpp unittests/rpc/handlers/LedgerEntryTests.cpp unittests/rpc/handlers/LedgerRangeTests.cpp + unittests/rpc/handlers/LedgerRangeTests.cpp + unittests/rpc/handlers/LedgerTests.cpp unittests/rpc/handlers/LedgerTests.cpp unittests/rpc/handlers/NFTBuyOffersTests.cpp + unittests/rpc/handlers/NFTBuyOffersTests.cpp + unittests/rpc/handlers/NFTHistoryTests.cpp unittests/rpc/handlers/NFTHistoryTests.cpp unittests/rpc/handlers/NFTInfoTests.cpp + unittests/rpc/handlers/NFTInfoTests.cpp + unittests/rpc/handlers/NFTsByIssuerTest.cpp unittests/rpc/handlers/NFTsByIssuerTest.cpp unittests/rpc/handlers/NFTSellOffersTests.cpp + unittests/rpc/handlers/NFTSellOffersTests.cpp + unittests/rpc/handlers/NoRippleCheckTests.cpp unittests/rpc/handlers/NoRippleCheckTests.cpp unittests/rpc/handlers/PingTests.cpp + unittests/rpc/handlers/PingTests.cpp + unittests/rpc/handlers/RandomTests.cpp unittests/rpc/handlers/RandomTests.cpp unittests/rpc/handlers/ServerInfoTests.cpp + unittests/rpc/handlers/ServerInfoTests.cpp + unittests/rpc/handlers/SubscribeTests.cpp unittests/rpc/handlers/SubscribeTests.cpp unittests/rpc/handlers/TestHandlerTests.cpp + unittests/rpc/handlers/TestHandlerTests.cpp + unittests/rpc/handlers/TransactionEntryTests.cpp unittests/rpc/handlers/TransactionEntryTests.cpp unittests/rpc/handlers/TxTests.cpp + unittests/rpc/handlers/TxTests.cpp unittests/rpc/handlers/UnsubscribeTests.cpp + unittests/rpc/handlers/UnsubscribeTests.cpp + unittests/rpc/handlers/VersionHandlerTests.cpp unittests/rpc/handlers/VersionHandlerTests.cpp unittests/rpc/JsonBoolTests.cpp # RPC handlers + unittests/rpc/JsonBoolTests.cpp + # RPC handlers unittests/rpc/RPCHelpersTests.cpp + unittests/rpc/RPCHelpersTests.cpp + unittests/rpc/WorkQueueTests.cpp unittests/rpc/WorkQueueTests.cpp unittests/util/AssertTests.cpp unittests/util/async/AnyExecutionContextTests.cpp diff --git a/docs/configure-clio.md b/docs/configure-clio.md index f8947d7e..bdfb3c4e 100644 --- a/docs/configure-clio.md +++ b/docs/configure-clio.md @@ -83,3 +83,17 @@ By default Clio checks admin privileges by IP address from requests (only `127.0 If the password is presented in the config, Clio will check the Authorization header (if any) in each request for the password. The Authorization header should contain the type `Password`, and the password from the config (e.g. `Password secret`). Exactly equal password gains admin rights for the request or a websocket connection. + +## ETL sources forwarding cache + +Clio can cache requests to ETL sources to reduce the load on the ETL source. +Only following commands are cached: `server_info`, `server_state`, `server_definitions`, `fee`, `ledger_closed`. +By default the forwarding cache is off. +To enable the caching for a source, `forwarding_cache_timeout` value should be added to the configuration file, e.g.: + +```json +"forwarding_cache_timeout": 0.250, +``` + +`forwarding_cache_timeout` defines for how long (in seconds) a cache entry will be valid after being placed into the cache. +Zero value turns off the cache feature. diff --git a/docs/examples/config/example-config.json b/docs/examples/config/example-config.json index eb28aebf..64c6885b 100644 --- a/docs/examples/config/example-config.json +++ b/docs/examples/config/example-config.json @@ -35,6 +35,7 @@ "grpc_port": "50051" } ], + "forwarding_cache_timeout": 0.250, // in seconds, could be 0, which means no cache "dos_guard": { // Comma-separated list of IPs to exclude from rate limiting "whitelist": [ diff --git a/src/etl/ETLState.hpp b/src/etl/ETLState.hpp index d7bba1a5..15104c00 100644 --- a/src/etl/ETLState.hpp +++ b/src/etl/ETLState.hpp @@ -44,7 +44,7 @@ struct ETLState { */ template static std::optional - fetchETLStateFromSource(Forward const& source) noexcept + fetchETLStateFromSource(Forward& source) noexcept { auto const serverInfoRippled = data::synchronous([&source](auto yield) { return source.forwardToRippled({{"command", "server_info"}}, std::nullopt, yield); diff --git a/src/etl/LoadBalancer.cpp b/src/etl/LoadBalancer.cpp index 67a91113..f16c5b98 100644 --- a/src/etl/LoadBalancer.cpp +++ b/src/etl/LoadBalancer.cpp @@ -70,6 +70,13 @@ LoadBalancer::LoadBalancer( std::shared_ptr validatedLedgers ) { + auto const forwardingCacheTimeout = config.valueOr("forwarding_cache_timeout", 0.f); + if (forwardingCacheTimeout > 0.f) { + forwardingCache_ = impl::ForwardingCache{ + std::chrono::duration_cast(std::chrono::duration{forwardingCacheTimeout}) + }; + } + static constexpr std::uint32_t MAX_DOWNLOAD = 256; if (auto value = config.maybeValue("num_markers"); value) { downloadRanges_ = std::clamp(*value, 1u, MAX_DOWNLOAD); @@ -99,7 +106,8 @@ LoadBalancer::LoadBalancer( if (not hasForwardingSource_) chooseForwardingSource(); }, - [this]() { chooseForwardingSource(); } + [this]() { chooseForwardingSource(); }, + [this]() { forwardingCache_->invalidate(); } ); // checking etl node validity @@ -193,22 +201,34 @@ LoadBalancer::forwardToRippled( boost::json::object const& request, std::optional const& clientIp, boost::asio::yield_context yield -) const +) { + if (forwardingCache_) { + if (auto cachedResponse = forwardingCache_->get(request); cachedResponse) { + return cachedResponse; + } + } + std::size_t sourceIdx = 0; if (!sources_.empty()) sourceIdx = util::Random::uniform(0ul, sources_.size() - 1); auto numAttempts = 0u; + std::optional response; while (numAttempts < sources_.size()) { - if (auto res = sources_[sourceIdx].forwardToRippled(request, clientIp, yield)) - return res; + if (auto res = sources_[sourceIdx].forwardToRippled(request, clientIp, yield)) { + response = std::move(res); + break; + } sourceIdx = (sourceIdx + 1) % sources_.size(); ++numAttempts; } + if (response and forwardingCache_ and not response->contains("error")) + forwardingCache_->put(request, *response); + return {}; } diff --git a/src/etl/LoadBalancer.hpp b/src/etl/LoadBalancer.hpp index e601b221..b11aaa0b 100644 --- a/src/etl/LoadBalancer.hpp +++ b/src/etl/LoadBalancer.hpp @@ -23,6 +23,7 @@ #include "etl/ETLHelpers.hpp" #include "etl/ETLState.hpp" #include "etl/Source.hpp" +#include "etl/impl/ForwardingCache.hpp" #include "feed/SubscriptionManager.hpp" #include "util/config/Config.hpp" #include "util/log/Logger.hpp" @@ -72,10 +73,12 @@ private: static constexpr std::uint32_t DEFAULT_DOWNLOAD_RANGES = 16; util::Logger log_{"ETL"}; + // Forwarding cache must be destroyed after sources because sources have a callnack to invalidate cache + std::optional forwardingCache_; std::vector sources_; std::optional etlState_; std::uint32_t downloadRanges_ = - DEFAULT_DOWNLOAD_RANGES; /*< The number of markers to use when downloading intial ledger */ + DEFAULT_DOWNLOAD_RANGES; /*< The number of markers to use when downloading initial ledger */ std::atomic_bool hasForwardingSource_{false}; public: @@ -164,7 +167,7 @@ public: boost::json::object const& request, std::optional const& clientIp, boost::asio::yield_context yield - ) const; + ); /** * @brief Return state of ETL nodes. diff --git a/src/etl/Source.cpp b/src/etl/Source.cpp index f87213ff..ed0c3e87 100644 --- a/src/etl/Source.cpp +++ b/src/etl/Source.cpp @@ -26,7 +26,9 @@ #include +#include #include +#include #include #include @@ -42,13 +44,15 @@ make_Source( std::shared_ptr subscriptions, std::shared_ptr validatedLedgers, Source::OnDisconnectHook onDisconnect, - Source::OnConnectHook onConnect + Source::OnConnectHook onConnect, + Source::OnLedgerClosedHook onLedgerClosed ) { auto const ip = config.valueOr("ip", {}); auto const wsPort = config.valueOr("ws_port", {}); auto const grpcPort = config.valueOr("grpc_port", {}); + impl::ForwardingSource forwardingSource{ip, wsPort}; impl::GrpcSource grpcSource{ip, grpcPort, std::move(backend)}; auto subscriptionSource = std::make_unique( ioc, @@ -57,9 +61,9 @@ make_Source( std::move(validatedLedgers), std::move(subscriptions), std::move(onConnect), - std::move(onDisconnect) + std::move(onDisconnect), + std::move(onLedgerClosed) ); - impl::ForwardingSource forwardingSource{ip, wsPort}; return Source{ ip, wsPort, grpcPort, std::move(grpcSource), std::move(subscriptionSource), std::move(forwardingSource) diff --git a/src/etl/Source.hpp b/src/etl/Source.hpp index ea5205a7..b4cc7c6f 100644 --- a/src/etl/Source.hpp +++ b/src/etl/Source.hpp @@ -68,6 +68,7 @@ class SourceImpl { public: using OnConnectHook = impl::SubscriptionSource::OnConnectHook; using OnDisconnectHook = impl::SubscriptionSource::OnDisconnectHook; + using OnLedgerClosedHook = impl::SubscriptionSource::OnLedgerClosedHook; /** * @brief Construct a new SourceImpl object @@ -80,7 +81,7 @@ public: * @param forwardingSource The forwarding source */ template - requires std::is_same_v && + requires std::is_same_v and std::is_same_v SourceImpl( std::string ip, @@ -250,7 +251,8 @@ make_Source( std::shared_ptr subscriptions, std::shared_ptr validatedLedgers, Source::OnDisconnectHook onDisconnect, - Source::OnConnectHook onConnect + Source::OnConnectHook onConnect, + Source::OnLedgerClosedHook onLedgerClosed ); } // namespace etl diff --git a/src/etl/impl/ForwardingCache.cpp b/src/etl/impl/ForwardingCache.cpp new file mode 100644 index 00000000..f1cdeed6 --- /dev/null +++ b/src/etl/impl/ForwardingCache.cpp @@ -0,0 +1,134 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2024, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "etl/impl/ForwardingCache.hpp" + +#include "util/Assert.hpp" + +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace etl::impl { + +namespace { + +std::optional +getCommand(boost::json::object const& request) +{ + if (not request.contains("command")) { + return std::nullopt; + } + return boost::json::value_to(request.at("command")); +} + +} // namespace + +void +CacheEntry::put(boost::json::object response) +{ + response_ = std::move(response); + lastUpdated_ = std::chrono::steady_clock::now(); +} + +std::optional +CacheEntry::get() const +{ + return response_; +} + +std::chrono::steady_clock::time_point +CacheEntry::lastUpdated() const +{ + return lastUpdated_; +} + +void +CacheEntry::invalidate() +{ + response_.reset(); +} + +std::unordered_set const + ForwardingCache::CACHEABLE_COMMANDS{"server_info", "server_state", "server_definitions", "fee", "ledger_closed"}; + +ForwardingCache::ForwardingCache(std::chrono::steady_clock::duration const cacheTimeout) : cacheTimeout_{cacheTimeout} +{ + for (auto const& command : CACHEABLE_COMMANDS) { + cache_.emplace(command, CacheEntry{}); + } +} + +bool +ForwardingCache::shouldCache(boost::json::object const& request) +{ + auto const command = getCommand(request); + return command.has_value() and CACHEABLE_COMMANDS.contains(*command); +} + +std::optional +ForwardingCache::get(boost::json::object const& request) const +{ + auto const command = getCommand(request); + + if (not command.has_value()) { + return std::nullopt; + } + + auto it = cache_.find(*command); + if (it == cache_.end()) + return std::nullopt; + + auto const& entry = it->second.lock(); + if (std::chrono::steady_clock::now() - entry->lastUpdated() > cacheTimeout_) + return std::nullopt; + + return entry->get(); +} + +void +ForwardingCache::put(boost::json::object const& request, boost::json::object const& response) +{ + auto const command = getCommand(request); + if (not command.has_value() or not shouldCache(request)) + return; + + ASSERT(cache_.contains(*command), "Command is not in the cache: {}", *command); + + auto entry = cache_[*command].lock(); + entry->put(response); +} + +void +ForwardingCache::invalidate() +{ + for (auto& [_, entry] : cache_) { + auto entryLock = entry.lock(); + entryLock->invalidate(); + } +} + +} // namespace etl::impl diff --git a/src/etl/impl/ForwardingCache.hpp b/src/etl/impl/ForwardingCache.hpp new file mode 100644 index 00000000..62ee0629 --- /dev/null +++ b/src/etl/impl/ForwardingCache.hpp @@ -0,0 +1,125 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2024, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "util/Mutex.hpp" + +#include + +#include +#include +#include +#include +#include +#include + +namespace etl::impl { + +/** + * @brief A class to store a cache entry. + */ +class CacheEntry { + std::chrono::steady_clock::time_point lastUpdated_; + std::optional response_; + +public: + /** + * @brief Put a response into the cache + * + * @param response The response to store + */ + void + put(boost::json::object response); + + /** + * @brief Get the response from the cache + * + * @return The response + */ + std::optional + get() const; + + /** + * @brief Get the last time the cache was updated + * + * @return The last time the cache was updated + */ + std::chrono::steady_clock::time_point + lastUpdated() const; + + /** + * @brief Invalidate the cache entry + */ + void + invalidate(); +}; + +/** + * @brief A class to store a cache of forwarding responses + */ +class ForwardingCache { + std::chrono::steady_clock::duration cacheTimeout_; + std::unordered_map> cache_; + +public: + static std::unordered_set const CACHEABLE_COMMANDS; + + /** + * @brief Construct a new Forwarding Cache object + * + * @param cacheTimeout The time for cache entries to expire + */ + ForwardingCache(std::chrono::steady_clock::duration cacheTimeout); + + /** + * @brief Check if a request should be cached + * + * @param request The request to check + * @return true if the request should be cached and false otherwise + */ + [[nodiscard]] static bool + shouldCache(boost::json::object const& request); + + /** + * @brief Get a response from the cache + * + * @param request The request to get the response for + * @return The response if it exists or std::nullopt otherwise + */ + [[nodiscard]] std::optional + get(boost::json::object const& request) const; + + /** + * @brief Put a response into the cache if the request should be cached + * + * @param request The request to store the response for + * @param response The response to store + */ + void + put(boost::json::object const& request, boost::json::object const& response); + + /** + * @brief Invalidate all entries in the cache + */ + void + invalidate(); +}; + +} // namespace etl::impl diff --git a/src/etl/impl/ForwardingSource.cpp b/src/etl/impl/ForwardingSource.cpp index 0d4487af..5afb2f34 100644 --- a/src/etl/impl/ForwardingSource.cpp +++ b/src/etl/impl/ForwardingSource.cpp @@ -92,6 +92,7 @@ ForwardingSource::forwardToRippled( auto responseObject = parsedResponse.as_object(); responseObject["forwarded"] = true; + return responseObject; } diff --git a/src/etl/impl/ForwardingSource.hpp b/src/etl/impl/ForwardingSource.hpp index 1c2acf42..b7752773 100644 --- a/src/etl/impl/ForwardingSource.hpp +++ b/src/etl/impl/ForwardingSource.hpp @@ -34,6 +34,7 @@ namespace etl::impl { class ForwardingSource { util::Logger log_; util::requests::WsConnectionBuilder connectionBuilder_; + static constexpr std::chrono::seconds CONNECTION_TIMEOUT{3}; public: diff --git a/src/etl/impl/SubscriptionSource.cpp b/src/etl/impl/SubscriptionSource.cpp index 4f970a89..d2d778bc 100644 --- a/src/etl/impl/SubscriptionSource.cpp +++ b/src/etl/impl/SubscriptionSource.cpp @@ -197,6 +197,8 @@ SubscriptionSource::handleMessage(std::string const& message) auto validatedLedgers = boost::json::value_to(object.at(JS(validated_ledgers))); setValidatedRange(std::move(validatedLedgers)); } + if (isForwarding_) + onLedgerClosed_(); } else { if (isForwarding_) { diff --git a/src/etl/impl/SubscriptionSource.hpp b/src/etl/impl/SubscriptionSource.hpp index 501d462e..81d6f3b0 100644 --- a/src/etl/impl/SubscriptionSource.hpp +++ b/src/etl/impl/SubscriptionSource.hpp @@ -52,6 +52,7 @@ class SubscriptionSource { public: using OnConnectHook = std::function; using OnDisconnectHook = std::function; + using OnLedgerClosedHook = std::function; private: util::Logger log_; @@ -72,6 +73,7 @@ private: OnConnectHook onConnect_; OnDisconnectHook onDisconnect_; + OnLedgerClosedHook onLedgerClosed_; std::atomic_bool isConnected_{false}; std::atomic_bool stop_{false}; @@ -97,6 +99,9 @@ public: * @param validatedLedgers The network validated ledgers object * @param subscriptions The subscription manager object * @param onDisconnect The onDisconnect hook. Called when the connection is lost + * @param onNewLedger The onNewLedger hook. Called when a new ledger is received + * @param onLedgerClosed The onLedgerClosed hook. Called when the ledger is closed but only if the source is + * forwarding * @param connectionTimeout The connection timeout. Defaults to 30 seconds * @param retryDelay The retry delay. Defaults to 1 second */ @@ -109,6 +114,7 @@ public: std::shared_ptr subscriptions, OnConnectHook onConnect, OnDisconnectHook onDisconnect, + OnLedgerClosedHook onLedgerClosed, std::chrono::steady_clock::duration const connectionTimeout = CONNECTION_TIMEOUT, std::chrono::steady_clock::duration const retryDelay = RETRY_DELAY ) @@ -119,6 +125,7 @@ public: , retry_(util::makeRetryExponentialBackoff(retryDelay, RETRY_MAX_DELAY, strand_)) , onConnect_(std::move(onConnect)) , onDisconnect_(std::move(onDisconnect)) + , onLedgerClosed_(std::move(onLedgerClosed)) { wsConnectionBuilder_.addHeader({boost::beast::http::field::user_agent, "clio-client"}) .addHeader({"X-User", "clio-client"}) diff --git a/src/util/Mutex.hpp b/src/util/Mutex.hpp index bb43909e..e507f01f 100644 --- a/src/util/Mutex.hpp +++ b/src/util/Mutex.hpp @@ -24,17 +24,19 @@ namespace util { -template +template class Mutex; /** * @brief A lock on a mutex that provides access to the protected data. * * @tparam ProtectedDataType data type to hold + * @tparam LockType type of lock + * @tparam MutexType type of mutex */ -template +template typename LockType, typename MutexType> class Lock { - std::scoped_lock lock_; + LockType lock_; ProtectedDataType& data_; public: @@ -77,9 +79,9 @@ public: /** @endcond */ private: - friend class Mutex>; + friend class Mutex, MutexType>; - explicit Lock(std::mutex& mutex, ProtectedDataType& data) : lock_(mutex), data_(data) + Lock(MutexType& mutex, ProtectedDataType& data) : lock_(mutex), data_(data) { } }; @@ -88,10 +90,11 @@ private: * @brief A container for data that is protected by a mutex. Inspired by Mutex in Rust. * * @tparam ProtectedDataType data type to hold + * @tparam MutexType type of mutex */ -template +template class Mutex { - mutable std::mutex mutex_; + mutable MutexType mutex_; ProtectedDataType data_; public: @@ -123,23 +126,27 @@ public: /** * @brief Lock the mutex and get a lock object allowing access to the protected data * + * @tparam LockType The type of lock to use * @return A lock on the mutex and a reference to the protected data */ - Lock + template