From 46514c8fe96861e8fcda00b5de67e95800c24666 Mon Sep 17 00:00:00 2001 From: Sergey Kuznetsov Date: Thu, 17 Apr 2025 16:44:53 +0100 Subject: [PATCH] feat: More efficient cache (#1997) Fixes #1473. --- src/etl/LoadBalancer.cpp | 98 ++++-- src/etl/LoadBalancer.hpp | 11 +- src/etlng/LoadBalancer.cpp | 98 ++++-- src/etlng/LoadBalancer.hpp | 11 +- src/etlng/LoadBalancerInterface.hpp | 2 +- src/rpc/RPCEngine.hpp | 133 +++++--- src/util/BlockingCache.hpp | 221 +++++++++++++ src/util/ResponseExpirationCache.cpp | 83 +++-- src/util/ResponseExpirationCache.hpp | 128 ++++---- tests/common/util/MockLoadBalancer.hpp | 2 +- tests/unit/CMakeLists.txt | 1 + tests/unit/etl/LoadBalancerTests.cpp | 4 +- tests/unit/etlng/LoadBalancerTests.cpp | 4 +- tests/unit/rpc/RPCEngineTests.cpp | 11 +- tests/unit/util/BlockingCacheTests.cpp | 252 +++++++++++++++ .../util/ResponseExpirationCacheTests.cpp | 293 ++++++++++++++++-- 16 files changed, 1092 insertions(+), 260 deletions(-) create mode 100644 src/util/BlockingCache.hpp create mode 100644 tests/unit/util/BlockingCacheTests.cpp diff --git a/src/etl/LoadBalancer.cpp b/src/etl/LoadBalancer.cpp index fa0010a0..81b84f95 100644 --- a/src/etl/LoadBalancer.cpp +++ b/src/etl/LoadBalancer.cpp @@ -54,6 +54,7 @@ #include #include #include +#include #include using namespace util::config; @@ -227,7 +228,7 @@ LoadBalancer::fetchLedger( return response; } -std::expected +std::expected LoadBalancer::forwardToRippled( boost::json::object const& request, std::optional const& clientIp, @@ -239,40 +240,37 @@ LoadBalancer::forwardToRippled( return std::unexpected{rpc::ClioError::RpcCommandIsMissing}; auto const cmd = boost::json::value_to(request.at("command")); - if (forwardingCache_) { - if (auto cachedResponse = forwardingCache_->get(cmd); cachedResponse) { - return std::move(cachedResponse).value(); + + if (forwardingCache_ and forwardingCache_->shouldCache(cmd)) { + auto updater = + [this, &request, &clientIp, isAdmin](boost::asio::yield_context yield + ) -> std::expected { + auto result = forwardToRippledImpl(request, clientIp, isAdmin, yield); + if (result.has_value()) { + return util::ResponseExpirationCache::EntryData{ + .lastUpdated = std::chrono::steady_clock::now(), .response = std::move(result).value() + }; + } + return std::unexpected{ + util::ResponseExpirationCache::Error{.status = rpc::Status{result.error()}, .warnings = {}} + }; + }; + + auto result = forwardingCache_->getOrUpdate( + yield, + cmd, + std::move(updater), + [](util::ResponseExpirationCache::EntryData const& entry) { return not entry.response.contains("error"); } + ); + if (result.has_value()) { + return std::move(result).value(); } + auto const combinedError = result.error().status.code; + ASSERT(std::holds_alternative(combinedError), "There could be only ClioError here"); + return std::unexpected{std::get(combinedError)}; } - ASSERT(not sources_.empty(), "ETL sources must be configured to forward requests."); - std::size_t sourceIdx = util::Random::uniform(0ul, sources_.size() - 1); - - auto numAttempts = 0u; - - auto xUserValue = isAdmin ? kADMIN_FORWARDING_X_USER_VALUE : kUSER_FORWARDING_X_USER_VALUE; - - std::optional response; - rpc::ClioError error = rpc::ClioError::EtlConnectionError; - while (numAttempts < sources_.size()) { - auto res = sources_[sourceIdx]->forwardToRippled(request, clientIp, xUserValue, yield); - if (res) { - response = std::move(res).value(); - break; - } - error = std::max(error, res.error()); // Choose the best result between all sources - - sourceIdx = (sourceIdx + 1) % sources_.size(); - ++numAttempts; - } - - if (response) { - if (forwardingCache_ and not response->contains("error")) - forwardingCache_->put(cmd, *response); - return std::move(response).value(); - } - - return std::unexpected{error}; + return forwardToRippledImpl(request, clientIp, isAdmin, yield); } boost::json::value @@ -363,4 +361,40 @@ LoadBalancer::chooseForwardingSource() } } +std::expected +LoadBalancer::forwardToRippledImpl( + boost::json::object const& request, + std::optional const& clientIp, + bool const isAdmin, + boost::asio::yield_context yield +) +{ + ASSERT(not sources_.empty(), "ETL sources must be configured to forward requests."); + std::size_t sourceIdx = util::Random::uniform(0ul, sources_.size() - 1); + + auto numAttempts = 0u; + + auto xUserValue = isAdmin ? kADMIN_FORWARDING_X_USER_VALUE : kUSER_FORWARDING_X_USER_VALUE; + + std::optional response; + rpc::ClioError error = rpc::ClioError::EtlConnectionError; + while (numAttempts < sources_.size()) { + auto res = sources_[sourceIdx]->forwardToRippled(request, clientIp, xUserValue, yield); + if (res) { + response = std::move(res).value(); + break; + } + error = std::max(error, res.error()); // Choose the best result between all sources + + sourceIdx = (sourceIdx + 1) % sources_.size(); + ++numAttempts; + } + + if (response.has_value()) { + return std::move(response).value(); + } + + return std::unexpected{error}; +} + } // namespace etl diff --git a/src/etl/LoadBalancer.hpp b/src/etl/LoadBalancer.hpp index 16963668..b8bf48e2 100644 --- a/src/etl/LoadBalancer.hpp +++ b/src/etl/LoadBalancer.hpp @@ -44,6 +44,7 @@ #include #include +#include #include #include #include @@ -218,7 +219,7 @@ public: * @param yield The coroutine context * @return Response received from rippled node as JSON object on success or error on failure */ - std::expected + std::expected forwardToRippled( boost::json::object const& request, std::optional const& clientIp, @@ -264,6 +265,14 @@ private: */ void chooseForwardingSource(); + + std::expected + forwardToRippledImpl( + boost::json::object const& request, + std::optional const& clientIp, + bool isAdmin, + boost::asio::yield_context yield + ); }; } // namespace etl diff --git a/src/etlng/LoadBalancer.cpp b/src/etlng/LoadBalancer.cpp index d6b96c7f..7acbcd14 100644 --- a/src/etlng/LoadBalancer.cpp +++ b/src/etlng/LoadBalancer.cpp @@ -55,6 +55,7 @@ #include #include #include +#include #include using namespace util::config; @@ -231,7 +232,7 @@ LoadBalancer::fetchLedger( return response; } -std::expected +std::expected LoadBalancer::forwardToRippled( boost::json::object const& request, std::optional const& clientIp, @@ -243,40 +244,37 @@ LoadBalancer::forwardToRippled( return std::unexpected{rpc::ClioError::RpcCommandIsMissing}; auto const cmd = boost::json::value_to(request.at("command")); - if (forwardingCache_) { - if (auto cachedResponse = forwardingCache_->get(cmd); cachedResponse) { - return std::move(cachedResponse).value(); + + if (forwardingCache_ and forwardingCache_->shouldCache(cmd)) { + auto updater = + [this, &request, &clientIp, isAdmin](boost::asio::yield_context yield + ) -> std::expected { + auto result = forwardToRippledImpl(request, clientIp, isAdmin, yield); + if (result.has_value()) { + return util::ResponseExpirationCache::EntryData{ + .lastUpdated = std::chrono::steady_clock::now(), .response = std::move(result).value() + }; + } + return std::unexpected{ + util::ResponseExpirationCache::Error{.status = rpc::Status{result.error()}, .warnings = {}} + }; + }; + + auto result = forwardingCache_->getOrUpdate( + yield, + cmd, + std::move(updater), + [](util::ResponseExpirationCache::EntryData const& entry) { return not entry.response.contains("error"); } + ); + if (result.has_value()) { + return std::move(result).value(); } + auto const combinedError = result.error().status.code; + ASSERT(std::holds_alternative(combinedError), "There could be only ClioError here"); + return std::unexpected{std::get(combinedError)}; } - ASSERT(not sources_.empty(), "ETL sources must be configured to forward requests."); - std::size_t sourceIdx = util::Random::uniform(0ul, sources_.size() - 1); - - auto numAttempts = 0u; - - auto xUserValue = isAdmin ? kADMIN_FORWARDING_X_USER_VALUE : kUSER_FORWARDING_X_USER_VALUE; - - std::optional response; - rpc::ClioError error = rpc::ClioError::EtlConnectionError; - while (numAttempts < sources_.size()) { - auto res = sources_[sourceIdx]->forwardToRippled(request, clientIp, xUserValue, yield); - if (res) { - response = std::move(res).value(); - break; - } - error = std::max(error, res.error()); // Choose the best result between all sources - - sourceIdx = (sourceIdx + 1) % sources_.size(); - ++numAttempts; - } - - if (response) { - if (forwardingCache_ and not response->contains("error")) - forwardingCache_->put(cmd, *response); - return std::move(response).value(); - } - - return std::unexpected{error}; + return forwardToRippledImpl(request, clientIp, isAdmin, yield); } boost::json::value @@ -367,4 +365,40 @@ LoadBalancer::chooseForwardingSource() } } +std::expected +LoadBalancer::forwardToRippledImpl( + boost::json::object const& request, + std::optional const& clientIp, + bool isAdmin, + boost::asio::yield_context yield +) +{ + ASSERT(not sources_.empty(), "ETL sources must be configured to forward requests."); + std::size_t sourceIdx = util::Random::uniform(0ul, sources_.size() - 1); + + auto numAttempts = 0u; + + auto xUserValue = isAdmin ? kADMIN_FORWARDING_X_USER_VALUE : kUSER_FORWARDING_X_USER_VALUE; + + std::optional response; + rpc::ClioError error = rpc::ClioError::EtlConnectionError; + while (numAttempts < sources_.size()) { + auto res = sources_[sourceIdx]->forwardToRippled(request, clientIp, xUserValue, yield); + if (res) { + response = std::move(res).value(); + break; + } + error = std::max(error, res.error()); // Choose the best result between all sources + + sourceIdx = (sourceIdx + 1) % sources_.size(); + ++numAttempts; + } + + if (response.has_value()) { + return std::move(response).value(); + } + + return std::unexpected{error}; +} + } // namespace etlng diff --git a/src/etlng/LoadBalancer.hpp b/src/etlng/LoadBalancer.hpp index ac01100d..fc84ad54 100644 --- a/src/etlng/LoadBalancer.hpp +++ b/src/etlng/LoadBalancer.hpp @@ -44,6 +44,7 @@ #include #include +#include #include #include #include @@ -220,7 +221,7 @@ public: * @param yield The coroutine context * @return Response received from rippled node as JSON object on success or error on failure */ - std::expected + std::expected forwardToRippled( boost::json::object const& request, std::optional const& clientIp, @@ -266,6 +267,14 @@ private: */ void chooseForwardingSource(); + + std::expected + forwardToRippledImpl( + boost::json::object const& request, + std::optional const& clientIp, + bool isAdmin, + boost::asio::yield_context yield + ); }; } // namespace etlng diff --git a/src/etlng/LoadBalancerInterface.hpp b/src/etlng/LoadBalancerInterface.hpp index 20623d45..200bbb3f 100644 --- a/src/etlng/LoadBalancerInterface.hpp +++ b/src/etlng/LoadBalancerInterface.hpp @@ -115,7 +115,7 @@ public: * @param yield The coroutine context * @return Response received from rippled node as JSON object on success or error on failure */ - virtual std::expected + virtual std::expected forwardToRippled( boost::json::object const& request, std::optional const& clientIp, diff --git a/src/rpc/RPCEngine.hpp b/src/rpc/RPCEngine.hpp index fe46db10..99df4409 100644 --- a/src/rpc/RPCEngine.hpp +++ b/src/rpc/RPCEngine.hpp @@ -27,6 +27,7 @@ #include "rpc/common/HandlerProvider.hpp" #include "rpc/common/Types.hpp" #include "rpc/common/impl/ForwardingProxy.hpp" +#include "util/OverloadSet.hpp" #include "util/ResponseExpirationCache.hpp" #include "util/log/Logger.hpp" #include "web/Context.hpp" @@ -35,6 +36,7 @@ #include #include #include +#include #include #include #include @@ -156,55 +158,51 @@ public: return forwardingProxy_.forward(ctx); } - if (not ctx.isAdmin and responseCache_) { - if (auto res = responseCache_->get(ctx.method); res.has_value()) - return Result{std::move(res).value()}; - } - - if (backend_->isTooBusy()) { - LOG(log_.error()) << "Database is too busy. Rejecting request"; - notifyTooBusy(); // TODO: should we add ctx.method if we have it? - return Result{Status{RippledError::rpcTOO_BUSY}}; - } - - auto const method = handlerProvider_->getHandler(ctx.method); - if (!method) { - notifyUnknownCommand(); - return Result{Status{RippledError::rpcUNKNOWN_COMMAND}}; - } - - try { - LOG(perfLog_.debug()) << ctx.tag() << " start executing rpc `" << ctx.method << '`'; - - auto const context = Context{ - .yield = ctx.yield, - .session = ctx.session, - .isAdmin = ctx.isAdmin, - .clientIp = ctx.clientIp, - .apiVersion = ctx.apiVersion + if (not ctx.isAdmin and responseCache_ and responseCache_->shouldCache(ctx.method)) { + auto updater = + [this, &ctx](boost::asio::yield_context + ) -> std::expected { + auto result = buildResponseImpl(ctx); + auto extracted = std::visit( + util::OverloadSet{ + [&result](Status status + ) -> std::expected { + return std::unexpected{util::ResponseExpirationCache::Error{ + .status = std::move(status), .warnings = std::move(result.warnings) + }}; + }, + [](boost::json::object obj + ) -> std::expected { return obj; } + }, + std::move(result.response) + ); + if (extracted.has_value()) { + return util::ResponseExpirationCache::EntryData{ + .lastUpdated = std::chrono::steady_clock::now(), .response = std::move(extracted).value() + }; + } + return std::unexpected{std::move(extracted).error()}; }; - auto v = (*method).process(ctx.params, context); - LOG(perfLog_.debug()) << ctx.tag() << " finish executing rpc `" << ctx.method << '`'; - - if (not v) { - notifyErrored(ctx.method); - } else if (not ctx.isAdmin and responseCache_) { - responseCache_->put(ctx.method, v.result->as_object()); + auto result = responseCache_->getOrUpdate( + ctx.yield, + ctx.method, + std::move(updater), + [&ctx](util::ResponseExpirationCache::EntryData const& entry) { + return not ctx.isAdmin and not entry.response.contains("error"); + } + ); + if (result.has_value()) { + return Result{std::move(result).value()}; } - return Result{std::move(v)}; - } catch (data::DatabaseTimeout const& t) { - LOG(log_.error()) << "Database timeout"; - notifyTooBusy(); - - return Result{Status{RippledError::rpcTOO_BUSY}}; - } catch (std::exception const& ex) { - LOG(log_.error()) << ctx.tag() << "Caught exception: " << ex.what(); - notifyInternalError(); - - return Result{Status{RippledError::rpcINTERNAL}}; + auto error = std::move(result).error(); + Result errorResult{std::move(error.status)}; + errorResult.warnings = std::move(error.warnings); + return errorResult; } + + return buildResponseImpl(ctx); } /** @@ -317,6 +315,53 @@ private: { return handlerProvider_->contains(method) || forwardingProxy_.isProxied(method); } + + Result + buildResponseImpl(web::Context const& ctx) + { + if (backend_->isTooBusy()) { + LOG(log_.error()) << "Database is too busy. Rejecting request"; + notifyTooBusy(); // TODO: should we add ctx.method if we have it? + return Result{Status{RippledError::rpcTOO_BUSY}}; + } + + auto const method = handlerProvider_->getHandler(ctx.method); + if (!method) { + notifyUnknownCommand(); + return Result{Status{RippledError::rpcUNKNOWN_COMMAND}}; + } + + try { + LOG(perfLog_.debug()) << ctx.tag() << " start executing rpc `" << ctx.method << '`'; + + auto const context = Context{ + .yield = ctx.yield, + .session = ctx.session, + .isAdmin = ctx.isAdmin, + .clientIp = ctx.clientIp, + .apiVersion = ctx.apiVersion + }; + auto v = (*method).process(ctx.params, context); + + LOG(perfLog_.debug()) << ctx.tag() << " finish executing rpc `" << ctx.method << '`'; + + if (not v) { + notifyErrored(ctx.method); + } + + return Result{std::move(v)}; + } catch (data::DatabaseTimeout const& t) { + LOG(log_.error()) << "Database timeout"; + notifyTooBusy(); + + return Result{Status{RippledError::rpcTOO_BUSY}}; + } catch (std::exception const& ex) { + LOG(log_.error()) << ctx.tag() << "Caught exception: " << ex.what(); + notifyInternalError(); + + return Result{Status{RippledError::rpcINTERNAL}}; + } + } }; } // namespace rpc diff --git a/src/util/BlockingCache.hpp b/src/util/BlockingCache.hpp new file mode 100644 index 00000000..091be39c --- /dev/null +++ b/src/util/BlockingCache.hpp @@ -0,0 +1,221 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "util/Assert.hpp" +#include "util/Mutex.hpp" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace util { + +/** + * @brief A thread-safe cache that blocks getting operations until the cache is updated + * + * @tparam ValueType The type of value to be cached + * @tparam ErrorType The type of error that can occur during updates + */ +template + requires(not std::same_as) +class BlockingCache { +public: + /** + * @brief Possible states of the cache + */ + enum class State { NoValue, Updating, HasValue }; + +private: + std::atomic state_{State::NoValue}; + util::Mutex, std::shared_mutex> value_; + boost::signals2::signal)> updateFinished_; + +public: + /** + * @brief Default constructor - creates an empty cache + */ + BlockingCache() = default; + + /** + * @brief Construct a cache with an initial value + * @param initialValue The value to initialize the cache with + */ + explicit BlockingCache(ValueType initialValue) : state_{State::HasValue}, value_(std::move(initialValue)) + { + } + + BlockingCache(BlockingCache&&) = delete; + BlockingCache(BlockingCache const&) = delete; + BlockingCache& + operator=(BlockingCache&&) = delete; + BlockingCache& + operator=(BlockingCache const&) = delete; + + /** + * @brief Function type for cache update operations + * @details Called when the cache needs to be populated or refreshed + */ + using Updater = std::function(boost::asio::yield_context)>; + + /** + * @brief Function type to verify if a value should be cached + * @details Returns true if the value should be stored in the cache + */ + using Verifier = std::function; + + /** + * @brief Asynchronously get a value from the cache, updating if necessary + * + * @param yield The asio yield context for coroutine suspension + * @param updater Function to generate a new value if needed + * @param verifier Function to validate whether a value should be cached + * @return std::expected The cached value or an error + * + * Depending on the current cache state, this will either: + * - Return the cached value if it's already present + * - Wait for an ongoing update to complete + * - Trigger a new update if the cache is empty + */ + [[nodiscard]] std::expected + asyncGet(boost::asio::yield_context yield, Updater updater, Verifier verifier) + { + switch (state_) { + case State::Updating: { + return wait(yield, std::move(updater), std::move(verifier)); + } + case State::HasValue: { + auto const value = value_.template lock(); + ASSERT(value->has_value(), "Value should be presented when the cache is full"); + return value->value(); + } + case State::NoValue: { + return update(yield, std::move(updater), std::move(verifier)); + } + }; + std::unreachable(); + } + + /** + * @brief Force an update of the cache value + * + * @param yield The ASIO yield context for coroutine suspension + * @param updater Function to generate a new value + * @param verifier Function to validate whether a value should be cached + * @return std::expected The new value or an error + * + * Initiates a cache update operation regardless of current state. + * If another update is already in progress, waits for it to complete. + */ + [[nodiscard]] std::expected + update(boost::asio::yield_context yield, Updater updater, Verifier verifier) + { + if (state_ == State::Updating) { + return asyncGet(yield, std::move(updater), std::move(verifier)); + } + state_ = State::Updating; + + auto const result = updater(yield); + auto const shouldBeCached = result.has_value() and verifier(result.value()); + + if (shouldBeCached) { + value_.lock().get() = result.value(); + state_ = State::HasValue; + } else { + state_ = State::NoValue; + value_.lock().get() = std::nullopt; + } + + updateFinished_(result); + return result; + } + + /** + * @brief Invalidates the currently cached value if present + * + * Clears the cache and sets its state to Empty. + * Has no effect if the cache is already empty or being updated. + */ + void + invalidate() + { + if (state_ == State::HasValue) { + state_ = State::NoValue; + value_.lock().get() = std::nullopt; + } + } + + /** + * @brief Returns the current state of the cache + * @return Current cache state (Empty, Updating, or Full) + */ + [[nodiscard]] State + state() const + { + return state_; + } + +private: + /** + * @brief Wait for an ongoing update to complete + * + * @param yield The ASIO yield context for coroutine suspension + * @param updater Function to generate a new value if needed + * @param verifier Function to validate whether a value should be cached + * @return std::expected The result of the ongoing update + * + * This method blocks the current coroutine until the ongoing update signals completion. + */ + std::expected + wait(boost::asio::yield_context yield, Updater updater, Verifier verifier) + { + boost::asio::steady_timer timer{yield.get_executor(), boost::asio::steady_timer::duration::max()}; + boost::system::error_code errorCode; + + std::optional> result; + boost::signals2::scoped_connection slot = + updateFinished_.connect([yield, &timer, &result](std::expected value) { + boost::asio::spawn(yield, [&timer, &result, value = std::move(value)](auto&&) { + result = std::move(value); + timer.cancel(); + }); + }); + + if (state_ == State::Updating) { + timer.async_wait(yield[errorCode]); + ASSERT(result.has_value(), "There should be some value after waiting"); + return std::move(result).value(); + } + return asyncGet(yield, std::move(updater), std::move(verifier)); + } +}; + +} // namespace util diff --git a/src/util/ResponseExpirationCache.cpp b/src/util/ResponseExpirationCache.cpp index 3b121e4f..d5924f9f 100644 --- a/src/util/ResponseExpirationCache.cpp +++ b/src/util/ResponseExpirationCache.cpp @@ -21,40 +21,26 @@ #include "util/Assert.hpp" +#include #include #include -#include -#include -#include +#include #include +#include #include namespace util { -void -ResponseExpirationCache::Entry::put(boost::json::object response) +ResponseExpirationCache::ResponseExpirationCache( + std::chrono::steady_clock::duration cacheTimeout, + std::unordered_set const& cmds +) + : cacheTimeout_(cacheTimeout) { - response_ = std::move(response); - lastUpdated_ = std::chrono::steady_clock::now(); -} - -std::optional -ResponseExpirationCache::Entry::get() const -{ - return response_; -} - -std::chrono::steady_clock::time_point -ResponseExpirationCache::Entry::lastUpdated() const -{ - return lastUpdated_; -} - -void -ResponseExpirationCache::Entry::invalidate() -{ - response_.reset(); + for (auto const& command : cmds) { + cache_.emplace(command, std::make_unique()); + } } bool @@ -63,38 +49,41 @@ ResponseExpirationCache::shouldCache(std::string const& cmd) return cache_.contains(cmd); } -std::optional -ResponseExpirationCache::get(std::string const& cmd) const +std::expected +ResponseExpirationCache::getOrUpdate( + boost::asio::yield_context yield, + std::string const& cmd, + Updater updater, + Verifier verifier +) { auto it = cache_.find(cmd); - if (it == cache_.end()) - return std::nullopt; + ASSERT(it != cache_.end(), "Can't get a value which is not in the cache"); - auto const& entry = it->second.lock(); - if (std::chrono::steady_clock::now() - entry->lastUpdated() > cacheTimeout_) - return std::nullopt; + auto& entry = it->second; + { + auto result = entry->asyncGet(yield, updater, verifier); + if (not result.has_value()) { + return std::unexpected{std::move(result).error()}; + } + if (std::chrono::steady_clock::now() - result->lastUpdated < cacheTimeout_) { + return std::move(result)->response; + } + } - return entry->get(); -} - -void -ResponseExpirationCache::put(std::string const& cmd, boost::json::object const& response) -{ - if (not shouldCache(cmd)) - return; - - ASSERT(cache_.contains(cmd), "Command is not in the cache: {}", cmd); - - auto entry = cache_[cmd].lock(); - entry->put(response); + // Force update due to cache timeout + auto result = entry->update(yield, std::move(updater), std::move(verifier)); + if (not result.has_value()) { + return std::unexpected{std::move(result).error()}; + } + return std::move(result)->response; } void ResponseExpirationCache::invalidate() { for (auto& [_, entry] : cache_) { - auto entryLock = entry.lock(); - entryLock->invalidate(); + entry->invalidate(); } } diff --git a/src/util/ResponseExpirationCache.hpp b/src/util/ResponseExpirationCache.hpp index e2cf5f23..f9346f2c 100644 --- a/src/util/ResponseExpirationCache.hpp +++ b/src/util/ResponseExpirationCache.hpp @@ -19,13 +19,15 @@ #pragma once -#include "util/Mutex.hpp" +#include "rpc/Errors.hpp" +#include "util/BlockingCache.hpp" +#include +#include #include #include -#include -#include +#include #include #include #include @@ -34,91 +36,87 @@ namespace util { /** * @brief Cache of requests' responses with TTL support and configurable cachable commands + * + * This class implements a time-based expiration cache for RPC responses. It allows + * caching responses for specified commands and automatically invalidates them after + * a configured timeout period. The cache uses BlockingCache internally to handle + * concurrent access and updates. */ class ResponseExpirationCache { +public: /** - * @brief A class to store a cache entry. + * @brief A data structure to store a cache entry with its timestamp */ - class Entry { - std::chrono::steady_clock::time_point lastUpdated_; - std::optional response_; - - public: - /** - * @brief Put a response into the cache - * - * @param response The response to store - */ - void - put(boost::json::object response); - - /** - * @brief Get the response from the cache - * - * @return The response - */ - std::optional - get() const; - - /** - * @brief Get the last time the cache was updated - * - * @return The last time the cache was updated - */ - std::chrono::steady_clock::time_point - lastUpdated() const; - - /** - * @brief Invalidate the cache entry - */ - void - invalidate(); + struct EntryData { + std::chrono::steady_clock::time_point lastUpdated; ///< When the entry was last updated + boost::json::object response; ///< The cached response data }; - std::chrono::steady_clock::duration cacheTimeout_; - std::unordered_map> cache_; + /** + * @brief A data structure to represent errors that can occur during an update of the cache + */ + struct Error { + rpc::Status status; ///< The status code and message of the error + boost::json::array warnings; ///< Any warnings related to the request - bool - shouldCache(std::string const& cmd); + bool + operator==(Error const&) const = default; + }; + + using CacheEntry = util::BlockingCache; + +private: + std::chrono::steady_clock::duration cacheTimeout_; + std::unordered_map> cache_; public: /** - * @brief Construct a new Cache object + * @brief Construct a new ResponseExpirationCache object * - * @param cacheTimeout The time for cache entries to expire - * @param cmds The commands that should be cached + * @param cacheTimeout The time period after which cached entries expire + * @param cmds The commands that should be cached (requests for other commands won't be cached) */ ResponseExpirationCache( std::chrono::steady_clock::duration cacheTimeout, std::unordered_set const& cmds - ) - : cacheTimeout_(cacheTimeout) - { - for (auto const& command : cmds) { - cache_.emplace(command, Entry{}); - } - } + ); /** - * @brief Get a response from the cache + * @brief Check if the given command should be cached * + * @param cmd The command to check + * @return true if the command should be cached, false otherwise + */ + bool + shouldCache(std::string const& cmd); + + using Updater = CacheEntry::Updater; + using Verifier = CacheEntry::Verifier; + + /** + * @brief Get a cached response or update the cache if necessary + * + * This method returns a cached response if it exists and hasn't expired. + * If the cache entry is expired or doesn't exist, it calls the updater to + * generate a new value. If multiple coroutines request the same entry + * simultaneously, only one updater will be called while others wait. + * + * @note cmd must be one of the commands that are cached. There is an ASSERT() inside the function + * + * @param yield Asio yield context for coroutine suspension * @param cmd The command to get the response for - * @return The response if it exists or std::nullopt otherwise + * @param updater Function to generate the response if not in cache or expired + * @param verifier Function to validate if a response should be cached + * @return The cached or newly generated response, or an error */ - [[nodiscard]] std::optional - get(std::string const& cmd) const; - - /** - * @brief Put a response into the cache if the request should be cached - * - * @param cmd The command to store the response for - * @param response The response to store - */ - void - put(std::string const& cmd, boost::json::object const& response); + [[nodiscard]] std::expected + getOrUpdate(boost::asio::yield_context yield, std::string const& cmd, Updater updater, Verifier verifier); /** * @brief Invalidate all entries in the cache + * + * This causes all cached entries to be cleared, forcing the next access + * to generate new responses. */ void invalidate(); diff --git a/tests/common/util/MockLoadBalancer.hpp b/tests/common/util/MockLoadBalancer.hpp index f0b9dd31..b9989252 100644 --- a/tests/common/util/MockLoadBalancer.hpp +++ b/tests/common/util/MockLoadBalancer.hpp @@ -62,7 +62,7 @@ struct MockNgLoadBalancer : etlng::LoadBalancerInterface { MOCK_METHOD(boost::json::value, toJson, (), (const, override)); MOCK_METHOD(std::optional, getETLState, (), (noexcept, override)); - using ForwardToRippledReturnType = std::expected; + using ForwardToRippledReturnType = std::expected; MOCK_METHOD( ForwardToRippledReturnType, forwardToRippled, diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index f47b80fd..0b9444fe 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -140,6 +140,7 @@ target_sources( util/async/AnyStrandTests.cpp util/async/AsyncExecutionContextTests.cpp util/BatchingTests.cpp + util/BlockingCacheTests.cpp util/ConceptsTests.cpp util/CoroutineGroupTests.cpp util/LedgerUtilsTests.cpp diff --git a/tests/unit/etl/LoadBalancerTests.cpp b/tests/unit/etl/LoadBalancerTests.cpp index 95d0216a..7fac8209 100644 --- a/tests/unit/etl/LoadBalancerTests.cpp +++ b/tests/unit/etl/LoadBalancerTests.cpp @@ -645,7 +645,7 @@ struct LoadBalancerForwardToRippledErrorTestBundle { std::string testName; rpc::ClioError firstSourceError; rpc::ClioError secondSourceError; - rpc::ClioError responseExpectedError; + rpc::CombinedError responseExpectedError; }; struct LoadBalancerForwardToRippledErrorTests @@ -776,7 +776,7 @@ TEST_F(LoadBalancerForwardToRippledTests, commandLineMissing) runSpawn([&](boost::asio::yield_context yield) { EXPECT_EQ( loadBalancer->forwardToRippled(request, clientIP_, false, yield).error(), - rpc::ClioError::RpcCommandIsMissing + rpc::CombinedError{rpc::ClioError::RpcCommandIsMissing} ); }); } diff --git a/tests/unit/etlng/LoadBalancerTests.cpp b/tests/unit/etlng/LoadBalancerTests.cpp index 9c7cc298..4ddfc7e3 100644 --- a/tests/unit/etlng/LoadBalancerTests.cpp +++ b/tests/unit/etlng/LoadBalancerTests.cpp @@ -672,7 +672,7 @@ struct LoadBalancerForwardToRippledErrorNgTestBundle { std::string testName; rpc::ClioError firstSourceError; rpc::ClioError secondSourceError; - rpc::ClioError responseExpectedError; + rpc::CombinedError responseExpectedError; }; struct LoadBalancerForwardToRippledErrorNgTests @@ -803,7 +803,7 @@ TEST_F(LoadBalancerForwardToRippledNgTests, commandLineMissing) runSpawn([&](boost::asio::yield_context yield) { EXPECT_EQ( loadBalancer->forwardToRippled(request, clientIP_, false, yield).error(), - rpc::ClioError::RpcCommandIsMissing + rpc::CombinedError{rpc::ClioError::RpcCommandIsMissing} ); }); } diff --git a/tests/unit/rpc/RPCEngineTests.cpp b/tests/unit/rpc/RPCEngineTests.cpp index 58020ff9..039eef2c 100644 --- a/tests/unit/rpc/RPCEngineTests.cpp +++ b/tests/unit/rpc/RPCEngineTests.cpp @@ -262,7 +262,7 @@ TEST_P(RPCEngineFlowParameterTest, Test) if (testBundle.response.has_value()) { EXPECT_EQ(*response, testBundle.response.value()); } else { - EXPECT_TRUE(*status == testBundle.status.value()); + EXPECT_EQ(*status, testBundle.status.value()); } }); } @@ -295,7 +295,7 @@ TEST_F(RPCEngineTest, ThrowDatabaseError) auto const res = engine->buildResponse(ctx); auto const status = std::get_if(&res.response); ASSERT_TRUE(status != nullptr); - EXPECT_TRUE(*status == Status{RippledError::rpcTOO_BUSY}); + EXPECT_EQ(*status, Status{RippledError::rpcTOO_BUSY}); }); } @@ -327,7 +327,7 @@ TEST_F(RPCEngineTest, ThrowException) auto const res = engine->buildResponse(ctx); auto const status = std::get_if(&res.response); ASSERT_TRUE(status != nullptr); - EXPECT_TRUE(*status == Status{RippledError::rpcINTERNAL}); + EXPECT_EQ(*status, Status{RippledError::rpcINTERNAL}); }); } @@ -453,7 +453,7 @@ TEST_P(RPCEngineCacheParameterTest, Test) auto const res = engine->buildResponse(ctx); auto const response = std::get_if(&res.response); - EXPECT_TRUE(*response == boost::json::parse(R"JSON({ "computed": "world_50"})JSON").as_object()); + EXPECT_EQ(*response, boost::json::parse(R"JSON({ "computed": "world_50"})JSON").as_object()); }); } } @@ -498,7 +498,8 @@ TEST_F(RPCEngineTest, NotCacheIfErrorHappen) auto const res = engine->buildResponse(ctx); auto const error = std::get_if(&res.response); - EXPECT_TRUE(*error == rpc::Status{"Very custom error"}); + ASSERT_NE(error, nullptr); + EXPECT_EQ(*error, rpc::Status{"Very custom error"}); }); } } diff --git a/tests/unit/util/BlockingCacheTests.cpp b/tests/unit/util/BlockingCacheTests.cpp new file mode 100644 index 00000000..2dcfe5cf --- /dev/null +++ b/tests/unit/util/BlockingCacheTests.cpp @@ -0,0 +1,252 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "util/AsioContextTestFixture.hpp" +#include "util/BlockingCache.hpp" +#include "util/NameGenerator.hpp" + +#include +#include +#include +#include + +#include + +using testing::MockFunction; +using testing::Return; +using testing::StrictMock; + +#include +#include + +#include +#include +#include + +struct BlockingCacheTest : SyncAsioContextTest { + using ErrorType = std::string; + using ValueType = int; + using Cache = util::BlockingCache; + using MockUpdater = StrictMock(boost::asio::yield_context)>>; + using MockVerifier = StrictMock>; + + std::unique_ptr cache = std::make_unique(); + MockUpdater mockUpdater; + MockVerifier mockVerifier; + int const value = 42; + std::string error = "some error"; +}; + +TEST_F(BlockingCacheTest, asyncGet_NoValueCacheUpdateSuccess) +{ + EXPECT_CALL(mockUpdater, Call).WillOnce(Return(value)); + EXPECT_CALL(mockVerifier, Call(value)).WillOnce(Return(true)); + + runSpawn([&](boost::asio::yield_context yield) { + auto result = cache->asyncGet(yield, mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); + + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result.value(), 42); + }); +} + +TEST_F(BlockingCacheTest, asyncGet_NoValueCacheUpdateFailure) +{ + EXPECT_CALL(mockUpdater, Call).WillOnce(Return(std::unexpected{error})); + + runSpawn([&](boost::asio::yield_context yield) { + auto result = cache->asyncGet(yield, mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); + + ASSERT_FALSE(result.has_value()); + EXPECT_EQ(result.error(), error); + }); +} + +TEST_F(BlockingCacheTest, asyncGet_NoValueCacheUpdateSuccessButVerifierRejects) +{ + runSpawn([&](boost::asio::yield_context yield) { + std::expected result; + { + EXPECT_CALL(mockUpdater, Call).WillOnce(Return(value)); + EXPECT_CALL(mockVerifier, Call(value)).WillOnce(Return(false)); + + result = cache->asyncGet(yield, mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); + + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result.value(), value); + } + + int const newValue = 24; + { + EXPECT_CALL(mockUpdater, Call).WillOnce(Return(newValue)); + EXPECT_CALL(mockVerifier, Call(newValue)).WillOnce(Return(true)); + + result = cache->asyncGet(yield, mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); + + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result.value(), newValue); + } + + result = cache->asyncGet(yield, mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); + + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result.value(), newValue); + }); +} + +TEST_F(BlockingCacheTest, asyncGet_HasValueCacheReturnsValue) +{ + cache = std::make_unique(value); + + runSpawn([&](boost::asio::yield_context yield) { + auto result = cache->asyncGet(yield, mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); + + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result.value(), value); + }); +} + +struct BlockingCacheWaitTestBundle { + bool updateSuccessful; + bool verifierAccepts; + std::string testName; +}; + +struct BlockingCacheWaitTest : BlockingCacheTest, testing::WithParamInterface {}; + +TEST_P(BlockingCacheWaitTest, WaitForUpdate) +{ + bool waitingCoroutineFinished = false; + + auto waitingCoroutine = [&](boost::asio::yield_context yield) { + auto result = cache->asyncGet(yield, mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); + + if (GetParam().updateSuccessful) { + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result.value(), value); + } else { + ASSERT_FALSE(result.has_value()); + EXPECT_EQ(result.error(), error); + } + waitingCoroutineFinished = true; + }; + + EXPECT_CALL(mockUpdater, Call) + .WillOnce([this, &waitingCoroutine](boost::asio::yield_context yield) -> std::expected { + boost::asio::spawn(yield, waitingCoroutine); + if (GetParam().updateSuccessful) { + return value; + } + return std::unexpected{error}; + }); + + if (GetParam().updateSuccessful) + EXPECT_CALL(mockVerifier, Call(value)).WillOnce(Return(GetParam().verifierAccepts)); + + runSpawnWithTimeout(std::chrono::seconds{1}, [&](boost::asio::yield_context yield) { + auto result = cache->asyncGet(yield, mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); + + if (GetParam().updateSuccessful) { + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result.value(), value); + } else { + ASSERT_FALSE(result.has_value()); + EXPECT_EQ(result.error(), error); + } + ASSERT_FALSE(waitingCoroutineFinished); + }); +} + +INSTANTIATE_TEST_SUITE_P( + BlockingCacheTest, + BlockingCacheWaitTest, + testing::Values( + BlockingCacheWaitTestBundle{ + .updateSuccessful = true, + .verifierAccepts = true, + .testName = "UpdateSucceedsVerifierAccepts" + }, + BlockingCacheWaitTestBundle{ + .updateSuccessful = true, + .verifierAccepts = false, + .testName = "UpdateSucceedsVerifierRejects" + }, + BlockingCacheWaitTestBundle{.updateSuccessful = false, .verifierAccepts = false, .testName = "UpdateFails"} + ), + tests::util::kNAME_GENERATOR +); + +TEST_F(BlockingCacheTest, InvalidateWhenStateIsNoValue) +{ + ASSERT_EQ(cache->state(), Cache::State::NoValue); + cache->invalidate(); + ASSERT_EQ(cache->state(), Cache::State::NoValue); +} + +TEST_F(BlockingCacheTest, InvalidateWhenStateIsUpdating) +{ + EXPECT_CALL(mockUpdater, Call).WillOnce([this](auto&&) { + EXPECT_EQ(cache->state(), Cache::State::Updating); + cache->invalidate(); + EXPECT_EQ(cache->state(), Cache::State::Updating); + return value; + }); + EXPECT_CALL(mockVerifier, Call(value)).WillOnce(Return(true)); + + runSpawn([&](boost::asio::yield_context yield) { + auto result = cache->asyncGet(yield, mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); + ASSERT_TRUE(result.has_value()); + ASSERT_EQ(result.value(), value); + ASSERT_EQ(cache->state(), Cache::State::HasValue); + }); +} + +TEST_F(BlockingCacheTest, InvalidateWhenStateIsHasValue) +{ + cache = std::make_unique(value); + ASSERT_EQ(cache->state(), Cache::State::HasValue); + cache->invalidate(); + EXPECT_EQ(cache->state(), Cache::State::NoValue); +} + +TEST_F(BlockingCacheTest, UpdateFromTwoCoroutinesHappensOnlyOnes) +{ + auto waitingCoroutine = [&](boost::asio::yield_context yield) { + auto result = cache->update(yield, mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); + ASSERT_TRUE(result.has_value()); + ASSERT_EQ(result.value(), value); + }; + + EXPECT_CALL(mockUpdater, Call) + .WillOnce([this, &waitingCoroutine](boost::asio::yield_context yield) -> std::expected { + boost::asio::spawn(yield, waitingCoroutine); + return value; + }); + EXPECT_CALL(mockVerifier, Call(value)).WillOnce(Return(true)); + + auto updatingCoroutine = [&](boost::asio::yield_context yield) { + auto const result = cache->update(yield, mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); + EXPECT_TRUE(result.has_value()); + ASSERT_EQ(result.value(), value); + }; + + runSpawnWithTimeout(std::chrono::seconds{1}, [&](boost::asio::yield_context yield) { + boost::asio::spawn(yield, updatingCoroutine); + }); +} diff --git a/tests/unit/util/ResponseExpirationCacheTests.cpp b/tests/unit/util/ResponseExpirationCacheTests.cpp index 5970b2de..ea7af34e 100644 --- a/tests/unit/util/ResponseExpirationCacheTests.cpp +++ b/tests/unit/util/ResponseExpirationCacheTests.cpp @@ -17,53 +17,292 @@ */ //============================================================================== +#include "rpc/Errors.hpp" +#include "util/AsioContextTestFixture.hpp" +#include "util/MockAssert.hpp" #include "util/ResponseExpirationCache.hpp" +#include #include +#include #include #include +#include #include +#include using namespace util; +using testing::MockFunction; +using testing::Return; +using testing::StrictMock; -struct ResponseExpirationCacheTests : public ::testing::Test { -protected: - ResponseExpirationCache cache_{std::chrono::seconds{100}, {"key"}}; - boost::json::object object_{{"key", "value"}}; +struct ResponseExpirationCacheTest : SyncAsioContextTest { + using MockUpdater = StrictMock(boost::asio::yield_context)>>; + using MockVerifier = StrictMock>; + + std::string const cmd = "server_info"; + boost::json::object const obj = {{"some key", "some value"}}; + MockUpdater mockUpdater; + MockVerifier mockVerifier; }; -TEST_F(ResponseExpirationCacheTests, PutAndGetNotExpired) +TEST_F(ResponseExpirationCacheTest, ShouldCacheDeterminesIfCommandIsCacheable) { - EXPECT_FALSE(cache_.get("key").has_value()); + std::unordered_set cmds = {cmd, "account_info"}; + ResponseExpirationCache cache{std::chrono::seconds(10), cmds}; - cache_.put("key", object_); - auto result = cache_.get("key"); - ASSERT_TRUE(result.has_value()); - EXPECT_EQ(*result, object_); - result = cache_.get("key2"); - ASSERT_FALSE(result.has_value()); + for (auto const& c : cmds) { + EXPECT_TRUE(cache.shouldCache(c)); + } - cache_.put("key2", object_); - result = cache_.get("key2"); - ASSERT_FALSE(result.has_value()); + EXPECT_FALSE(cache.shouldCache("account_tx")); + EXPECT_FALSE(cache.shouldCache("ledger")); + EXPECT_FALSE(cache.shouldCache("submit")); + EXPECT_FALSE(cache.shouldCache("")); } -TEST_F(ResponseExpirationCacheTests, Invalidate) +TEST_F(ResponseExpirationCacheTest, ShouldCacheEmptySetMeansNothingCacheable) { - cache_.put("key", object_); - cache_.invalidate(); - EXPECT_FALSE(cache_.get("key").has_value()); + std::unordered_set const emptyCmds; + ResponseExpirationCache cache{std::chrono::seconds(10), emptyCmds}; + + EXPECT_FALSE(cache.shouldCache("server_info")); + EXPECT_FALSE(cache.shouldCache("account_info")); + EXPECT_FALSE(cache.shouldCache("any_command")); + EXPECT_FALSE(cache.shouldCache("")); } -TEST_F(ResponseExpirationCacheTests, GetExpired) +TEST_F(ResponseExpirationCacheTest, ShouldCacheCaseMatchingIsRequired) { - ResponseExpirationCache cache{std::chrono::milliseconds{1}, {"key"}}; - auto const response = boost::json::object{{"key", "value"}}; + std::unordered_set const specificCmds = {cmd}; + ResponseExpirationCache cache{std::chrono::seconds(10), specificCmds}; - cache.put("key", response); - std::this_thread::sleep_for(std::chrono::milliseconds{2}); - - auto const result = cache.get("key"); - EXPECT_FALSE(result); + EXPECT_TRUE(cache.shouldCache(cmd)); + EXPECT_FALSE(cache.shouldCache("SERVER_INFO")); + EXPECT_FALSE(cache.shouldCache("Server_Info")); +} + +TEST_F(ResponseExpirationCacheTest, GetOrUpdateNoValueInCacheCallsUpdaterAndVerifier) +{ + ResponseExpirationCache cache{std::chrono::seconds(10), {cmd}}; + + runSpawn([&](boost::asio::yield_context yield) { + EXPECT_CALL(mockUpdater, Call) + .WillOnce(Return(ResponseExpirationCache::EntryData{ + .lastUpdated = std::chrono::steady_clock::now(), + .response = obj, + })); + EXPECT_CALL(mockVerifier, Call).WillOnce(Return(true)); + + auto result = + cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); + + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result.value(), obj); + }); +} + +TEST_F(ResponseExpirationCacheTest, GetOrUpdateExpiredValueInCacheCallsUpdaterAndVerifier) +{ + ResponseExpirationCache cache{std::chrono::milliseconds(1), {cmd}}; + + runSpawn([&](boost::asio::yield_context yield) { + boost::json::object const expiredObject = {{"some key", "expired value"}}; + EXPECT_CALL(mockUpdater, Call) + .WillOnce(Return(ResponseExpirationCache::EntryData{ + .lastUpdated = std::chrono::steady_clock::now(), + .response = expiredObject, + })); + EXPECT_CALL(mockVerifier, Call).WillOnce(Return(true)); + + auto result = + cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); + + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result.value(), expiredObject); + + std::this_thread::sleep_for(std::chrono::milliseconds(2)); + + EXPECT_CALL(mockUpdater, Call) + .WillOnce(Return( + ResponseExpirationCache::EntryData{.lastUpdated = std::chrono::steady_clock::now(), .response = obj} + )); + EXPECT_CALL(mockVerifier, Call).WillOnce(Return(true)); + + result = cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); + + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result.value(), obj); + }); +} + +TEST_F(ResponseExpirationCacheTest, GetOrUpdateCachedValueNotExpiredDoesNotCallUpdaterOrVerifier) +{ + ResponseExpirationCache cache{std::chrono::seconds(10), {cmd}}; + + runSpawn([&](boost::asio::yield_context yield) { + // First call to populate cache + EXPECT_CALL(mockUpdater, Call) + .WillOnce(Return(ResponseExpirationCache::EntryData{ + .lastUpdated = std::chrono::steady_clock::now(), + .response = obj, + })); + EXPECT_CALL(mockVerifier, Call).WillOnce(Return(true)); + + auto result = + cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); + + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result.value(), obj); + + // Second call should use cached value and not call updater/verifier + result = cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); + + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result.value(), obj); + }); +} + +TEST_F(ResponseExpirationCacheTest, GetOrUpdateHandlesErrorFromUpdater) +{ + ResponseExpirationCache cache{std::chrono::seconds(10), {cmd}}; + + ResponseExpirationCache::Error const error{ + .status = rpc::Status{rpc::ClioError::EtlConnectionError}, .warnings = {} + }; + + runSpawn([&](boost::asio::yield_context yield) { + EXPECT_CALL(mockUpdater, Call).WillOnce(Return(std::unexpected(error))); + + auto result = + cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); + + ASSERT_FALSE(result.has_value()); + EXPECT_EQ(result.error(), error); + }); +} + +TEST_F(ResponseExpirationCacheTest, GetOrUpdateVerifierRejection) +{ + ResponseExpirationCache cache{std::chrono::seconds(10), {cmd}}; + + runSpawn([&](boost::asio::yield_context yield) { + EXPECT_CALL(mockUpdater, Call) + .WillOnce(Return(ResponseExpirationCache::EntryData{ + .lastUpdated = std::chrono::steady_clock::now(), + .response = obj, + })); + EXPECT_CALL(mockVerifier, Call).WillOnce(Return(false)); + + auto result = + cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); + + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result.value(), obj); + + boost::json::object const anotherObj = {{"some key", "another value"}}; + EXPECT_CALL(mockUpdater, Call) + .WillOnce(Return(ResponseExpirationCache::EntryData{ + .lastUpdated = std::chrono::steady_clock::now(), + .response = anotherObj, + })); + EXPECT_CALL(mockVerifier, Call).WillOnce(Return(true)); + + result = cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); + + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result.value(), anotherObj); + }); +} + +TEST_F(ResponseExpirationCacheTest, GetOrUpdateMultipleConcurrentUpdates) +{ + ResponseExpirationCache cache{std::chrono::seconds(10), {cmd}}; + bool waitingCoroutineFinished = false; + + auto waitingCoroutine = [&](boost::asio::yield_context yield) { + auto result = + cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); + + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result.value(), obj); + waitingCoroutineFinished = true; + }; + + EXPECT_CALL(mockUpdater, Call) + .WillOnce( + [this, &waitingCoroutine](boost::asio::yield_context yield + ) -> std::expected { + boost::asio::spawn(yield, waitingCoroutine); + return ResponseExpirationCache::EntryData{ + .lastUpdated = std::chrono::steady_clock::now(), + .response = obj, + }; + } + ); + EXPECT_CALL(mockVerifier, Call).WillOnce(Return(true)); + + runSpawnWithTimeout(std::chrono::seconds{1}, [&](boost::asio::yield_context yield) { + auto result = + cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); + + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result.value(), obj); + ASSERT_FALSE(waitingCoroutineFinished); + }); +} + +TEST_F(ResponseExpirationCacheTest, InvalidateForcesRefresh) +{ + ResponseExpirationCache cache{std::chrono::seconds(10), {cmd}}; + + runSpawn([&](boost::asio::yield_context yield) { + boost::json::object oldObject = {{"some key", "old value"}}; + EXPECT_CALL(mockUpdater, Call) + .WillOnce(Return(ResponseExpirationCache::EntryData{ + .lastUpdated = std::chrono::steady_clock::now(), + .response = oldObject, + })); + EXPECT_CALL(mockVerifier, Call).WillOnce(Return(true)); + + auto result = + cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); + + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result.value(), oldObject); + + cache.invalidate(); + + EXPECT_CALL(mockUpdater, Call) + .WillOnce(Return(ResponseExpirationCache::EntryData{ + .lastUpdated = std::chrono::steady_clock::now(), + .response = obj, + })); + EXPECT_CALL(mockVerifier, Call).WillOnce(Return(true)); + + result = cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); + + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result.value(), obj); + }); +} + +struct ResponseExpirationCacheAssertTest : common::util::WithMockAssert, ResponseExpirationCacheTest {}; + +TEST_F(ResponseExpirationCacheAssertTest, NonCacheableCommandThrowsAssertion) +{ + ResponseExpirationCache cache{std::chrono::seconds(10), {cmd}}; + + ASSERT_FALSE(cache.shouldCache("non_cacheable_command")); + + runSpawn([&](boost::asio::yield_context yield) { + EXPECT_CLIO_ASSERT_FAIL({ + [[maybe_unused]] + auto const v = cache.getOrUpdate( + yield, "non_cacheable_command", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction() + ); + }); + }); }