From b6c1e2578b4c465478294e57aff62e622a5c3385 Mon Sep 17 00:00:00 2001 From: Sergey Kuznetsov Date: Mon, 14 Jul 2025 13:47:13 +0100 Subject: [PATCH] chore: Remove using blocking cache (#2328) BlockingCache has a bug so reverting its usage for now. --- src/etl/LoadBalancer.cpp | 110 ++----- src/etl/LoadBalancer.hpp | 9 +- src/etlng/LoadBalancer.cpp | 110 ++----- src/etlng/LoadBalancer.hpp | 8 - src/rpc/RPCEngine.hpp | 87 ++--- src/util/ResponseExpirationCache.cpp | 84 ++--- src/util/ResponseExpirationCache.hpp | 130 ++++---- .../util/ResponseExpirationCacheTests.cpp | 308 ++---------------- 8 files changed, 260 insertions(+), 586 deletions(-) diff --git a/src/etl/LoadBalancer.cpp b/src/etl/LoadBalancer.cpp index 50fe7877..a7651232 100644 --- a/src/etl/LoadBalancer.cpp +++ b/src/etl/LoadBalancer.cpp @@ -57,7 +57,6 @@ #include #include #include -#include #include using namespace util::config; @@ -278,40 +277,46 @@ LoadBalancer::forwardToRippled( return std::unexpected{rpc::ClioError::RpcCommandIsMissing}; auto const cmd = boost::json::value_to(request.at("command")); - - if (forwardingCache_ and forwardingCache_->shouldCache(cmd)) { - bool servedFromCache = true; - auto updater = [this, &request, &clientIp, &servedFromCache, isAdmin](boost::asio::yield_context yield) - -> std::expected { - servedFromCache = false; - auto result = forwardToRippledImpl(request, clientIp, isAdmin, yield); - if (result.has_value()) { - return util::ResponseExpirationCache::EntryData{ - .lastUpdated = std::chrono::steady_clock::now(), .response = std::move(result).value() - }; - } - return std::unexpected{ - util::ResponseExpirationCache::Error{.status = rpc::Status{result.error()}, .warnings = {}} - }; - }; - - auto result = forwardingCache_->getOrUpdate( - yield, cmd, std::move(updater), [](util::ResponseExpirationCache::EntryData const& entry) { - return not entry.response.contains("error"); - } - ); - if (servedFromCache) { - ++forwardingCounters_.cacheHit.get(); + if (forwardingCache_) { + if (auto cachedResponse = forwardingCache_->get(cmd); cachedResponse) { + forwardingCounters_.cacheHit.get() += 1; + return std::move(cachedResponse).value(); } - if (result.has_value()) { - return std::move(result).value(); + } + forwardingCounters_.cacheMiss.get() += 1; + + ASSERT(not sources_.empty(), "ETL sources must be configured to forward requests."); + std::size_t sourceIdx = randomGenerator_->uniform(0ul, sources_.size() - 1); + + auto numAttempts = 0u; + + auto xUserValue = isAdmin ? kADMIN_FORWARDING_X_USER_VALUE : kUSER_FORWARDING_X_USER_VALUE; + + std::optional response; + rpc::ClioError error = rpc::ClioError::EtlConnectionError; + while (numAttempts < sources_.size()) { + auto [res, duration] = + util::timed([&]() { return sources_[sourceIdx]->forwardToRippled(request, clientIp, xUserValue, yield); }); + if (res) { + forwardingCounters_.successDuration.get() += duration; + response = std::move(res).value(); + break; } - auto const combinedError = result.error().status.code; - ASSERT(std::holds_alternative(combinedError), "There could be only ClioError here"); - return std::unexpected{std::get(combinedError)}; + forwardingCounters_.failDuration.get() += duration; + ++forwardingCounters_.retries.get(); + error = std::max(error, res.error()); // Choose the best result between all sources + + sourceIdx = (sourceIdx + 1) % sources_.size(); + ++numAttempts; } - return forwardToRippledImpl(request, clientIp, isAdmin, yield); + if (response) { + if (forwardingCache_ and not response->contains("error")) + forwardingCache_->put(cmd, *response); + return std::move(response).value(); + } + + return std::unexpected{error}; } boost::json::value @@ -402,47 +407,4 @@ LoadBalancer::chooseForwardingSource() } } -std::expected -LoadBalancer::forwardToRippledImpl( - boost::json::object const& request, - std::optional const& clientIp, - bool const isAdmin, - boost::asio::yield_context yield -) -{ - ++forwardingCounters_.cacheMiss.get(); - - ASSERT(not sources_.empty(), "ETL sources must be configured to forward requests."); - std::size_t sourceIdx = randomGenerator_->uniform(0ul, sources_.size() - 1); - - auto numAttempts = 0u; - - auto xUserValue = isAdmin ? kADMIN_FORWARDING_X_USER_VALUE : kUSER_FORWARDING_X_USER_VALUE; - - std::optional response; - rpc::ClioError error = rpc::ClioError::EtlConnectionError; - while (numAttempts < sources_.size()) { - auto [res, duration] = - util::timed([&]() { return sources_[sourceIdx]->forwardToRippled(request, clientIp, xUserValue, yield); }); - - if (res) { - forwardingCounters_.successDuration.get() += duration; - response = std::move(res).value(); - break; - } - forwardingCounters_.failDuration.get() += duration; - ++forwardingCounters_.retries.get(); - error = std::max(error, res.error()); // Choose the best result between all sources - - sourceIdx = (sourceIdx + 1) % sources_.size(); - ++numAttempts; - } - - if (response.has_value()) { - return std::move(response).value(); - } - - return std::unexpected{error}; -} - } // namespace etl diff --git a/src/etl/LoadBalancer.hpp b/src/etl/LoadBalancer.hpp index 2affb52b..0eb25403 100644 --- a/src/etl/LoadBalancer.hpp +++ b/src/etl/LoadBalancer.hpp @@ -49,6 +49,7 @@ #include #include #include +#include #include #include #include @@ -281,14 +282,6 @@ private: */ void chooseForwardingSource(); - - std::expected - forwardToRippledImpl( - boost::json::object const& request, - std::optional const& clientIp, - bool isAdmin, - boost::asio::yield_context yield - ); }; } // namespace etl diff --git a/src/etlng/LoadBalancer.cpp b/src/etlng/LoadBalancer.cpp index e3ae4bb6..8b0dd29f 100644 --- a/src/etlng/LoadBalancer.cpp +++ b/src/etlng/LoadBalancer.cpp @@ -58,7 +58,6 @@ #include #include #include -#include #include using namespace util::config; @@ -284,40 +283,46 @@ LoadBalancer::forwardToRippled( return std::unexpected{rpc::ClioError::RpcCommandIsMissing}; auto const cmd = boost::json::value_to(request.at("command")); - - if (forwardingCache_ and forwardingCache_->shouldCache(cmd)) { - bool servedFromCache = true; - auto updater = [this, &request, &clientIp, &servedFromCache, isAdmin](boost::asio::yield_context yield) - -> std::expected { - servedFromCache = false; - auto result = forwardToRippledImpl(request, clientIp, isAdmin, yield); - if (result.has_value()) { - return util::ResponseExpirationCache::EntryData{ - .lastUpdated = std::chrono::steady_clock::now(), .response = std::move(result).value() - }; - } - return std::unexpected{ - util::ResponseExpirationCache::Error{.status = rpc::Status{result.error()}, .warnings = {}} - }; - }; - - auto result = forwardingCache_->getOrUpdate( - yield, cmd, std::move(updater), [](util::ResponseExpirationCache::EntryData const& entry) { - return not entry.response.contains("error"); - } - ); - if (servedFromCache) { - ++forwardingCounters_.cacheHit.get(); + if (forwardingCache_) { + if (auto cachedResponse = forwardingCache_->get(cmd); cachedResponse) { + forwardingCounters_.cacheHit.get() += 1; + return std::move(cachedResponse).value(); } - if (result.has_value()) { - return std::move(result).value(); + } + forwardingCounters_.cacheMiss.get() += 1; + + ASSERT(not sources_.empty(), "ETL sources must be configured to forward requests."); + std::size_t sourceIdx = randomGenerator_->uniform(0ul, sources_.size() - 1); + + auto numAttempts = 0u; + + auto xUserValue = isAdmin ? kADMIN_FORWARDING_X_USER_VALUE : kUSER_FORWARDING_X_USER_VALUE; + + std::optional response; + rpc::ClioError error = rpc::ClioError::EtlConnectionError; + while (numAttempts < sources_.size()) { + auto [res, duration] = + util::timed([&]() { return sources_[sourceIdx]->forwardToRippled(request, clientIp, xUserValue, yield); }); + if (res) { + forwardingCounters_.successDuration.get() += duration; + response = std::move(res).value(); + break; } - auto const combinedError = result.error().status.code; - ASSERT(std::holds_alternative(combinedError), "There could be only ClioError here"); - return std::unexpected{std::get(combinedError)}; + forwardingCounters_.failDuration.get() += duration; + ++forwardingCounters_.retries.get(); + error = std::max(error, res.error()); // Choose the best result between all sources + + sourceIdx = (sourceIdx + 1) % sources_.size(); + ++numAttempts; } - return forwardToRippledImpl(request, clientIp, isAdmin, yield); + if (response) { + if (forwardingCache_ and not response->contains("error")) + forwardingCache_->put(cmd, *response); + return std::move(response).value(); + } + + return std::unexpected{error}; } boost::json::value @@ -408,47 +413,4 @@ LoadBalancer::chooseForwardingSource() } } -std::expected -LoadBalancer::forwardToRippledImpl( - boost::json::object const& request, - std::optional const& clientIp, - bool isAdmin, - boost::asio::yield_context yield -) -{ - ++forwardingCounters_.cacheMiss.get(); - - ASSERT(not sources_.empty(), "ETL sources must be configured to forward requests."); - std::size_t sourceIdx = randomGenerator_->uniform(0ul, sources_.size() - 1); - - auto numAttempts = 0u; - - auto xUserValue = isAdmin ? kADMIN_FORWARDING_X_USER_VALUE : kUSER_FORWARDING_X_USER_VALUE; - - std::optional response; - rpc::ClioError error = rpc::ClioError::EtlConnectionError; - while (numAttempts < sources_.size()) { - auto [res, duration] = - util::timed([&]() { return sources_[sourceIdx]->forwardToRippled(request, clientIp, xUserValue, yield); }); - - if (res) { - forwardingCounters_.successDuration.get() += duration; - response = std::move(res).value(); - break; - } - forwardingCounters_.failDuration.get() += duration; - ++forwardingCounters_.retries.get(); - error = std::max(error, res.error()); // Choose the best result between all sources - - sourceIdx = (sourceIdx + 1) % sources_.size(); - ++numAttempts; - } - - if (response.has_value()) { - return std::move(response).value(); - } - - return std::unexpected{error}; -} - } // namespace etlng diff --git a/src/etlng/LoadBalancer.hpp b/src/etlng/LoadBalancer.hpp index ebbb302b..760f35bf 100644 --- a/src/etlng/LoadBalancer.hpp +++ b/src/etlng/LoadBalancer.hpp @@ -282,14 +282,6 @@ private: */ void chooseForwardingSource(); - - std::expected - forwardToRippledImpl( - boost::json::object const& request, - std::optional const& clientIp, - bool isAdmin, - boost::asio::yield_context yield - ); }; } // namespace etlng diff --git a/src/rpc/RPCEngine.hpp b/src/rpc/RPCEngine.hpp index 0ae8596d..1bb8fe9b 100644 --- a/src/rpc/RPCEngine.hpp +++ b/src/rpc/RPCEngine.hpp @@ -157,48 +157,55 @@ public: return forwardingProxy_.forward(ctx); } - if (not ctx.isAdmin and responseCache_ and responseCache_->shouldCache(ctx.method)) { - auto updater = [this, &ctx](boost::asio::yield_context) - -> std::expected { - auto result = buildResponseImpl(ctx); - - auto const extracted = - [&result]() -> std::expected { - if (result.response.has_value()) { - return std::move(result.response).value(); - } - return std::unexpected{util::ResponseExpirationCache::Error{ - .status = std::move(result.response).error(), .warnings = std::move(result.warnings) - }}; - }(); - - if (extracted.has_value()) { - return util::ResponseExpirationCache::EntryData{ - .lastUpdated = std::chrono::steady_clock::now(), .response = std::move(extracted).value() - }; - } - return std::unexpected{std::move(extracted).error()}; - }; - - auto result = responseCache_->getOrUpdate( - ctx.yield, - ctx.method, - std::move(updater), - [&ctx](util::ResponseExpirationCache::EntryData const& entry) { - return not ctx.isAdmin and not entry.response.contains("error"); - } - ); - if (result.has_value()) { - return Result{std::move(result).value()}; - } - - auto error = std::move(result).error(); - Result errorResult{std::move(error.status)}; - errorResult.warnings = std::move(error.warnings); - return errorResult; + if (not ctx.isAdmin and responseCache_) { + if (auto res = responseCache_->get(ctx.method); res.has_value()) + return Result{std::move(res).value()}; } - return buildResponseImpl(ctx); + if (backend_->isTooBusy()) { + LOG(log_.error()) << "Database is too busy. Rejecting request"; + notifyTooBusy(); // TODO: should we add ctx.method if we have it? + return Result{Status{RippledError::rpcTOO_BUSY}}; + } + + auto const method = handlerProvider_->getHandler(ctx.method); + if (!method) { + notifyUnknownCommand(); + return Result{Status{RippledError::rpcUNKNOWN_COMMAND}}; + } + + try { + LOG(perfLog_.debug()) << ctx.tag() << " start executing rpc `" << ctx.method << '`'; + + auto const context = Context{ + .yield = ctx.yield, + .session = ctx.session, + .isAdmin = ctx.isAdmin, + .clientIp = ctx.clientIp, + .apiVersion = ctx.apiVersion + }; + auto v = (*method).process(ctx.params, context); + + LOG(perfLog_.debug()) << ctx.tag() << " finish executing rpc `" << ctx.method << '`'; + + if (not v) { + notifyErrored(ctx.method); + } else if (not ctx.isAdmin and responseCache_) { + responseCache_->put(ctx.method, v.result->as_object()); + } + + return Result{std::move(v)}; + } catch (data::DatabaseTimeout const& t) { + LOG(log_.error()) << "Database timeout"; + notifyTooBusy(); + + return Result{Status{RippledError::rpcTOO_BUSY}}; + } catch (std::exception const& ex) { + LOG(log_.error()) << ctx.tag() << "Caught exception: " << ex.what(); + notifyInternalError(); + + return Result{Status{RippledError::rpcINTERNAL}}; + } } /** diff --git a/src/util/ResponseExpirationCache.cpp b/src/util/ResponseExpirationCache.cpp index d5924f9f..7ebc0fcf 100644 --- a/src/util/ResponseExpirationCache.cpp +++ b/src/util/ResponseExpirationCache.cpp @@ -16,31 +16,44 @@ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ //============================================================================== - #include "util/ResponseExpirationCache.hpp" #include "util/Assert.hpp" -#include #include #include -#include +#include +#include +#include #include -#include #include namespace util { -ResponseExpirationCache::ResponseExpirationCache( - std::chrono::steady_clock::duration cacheTimeout, - std::unordered_set const& cmds -) - : cacheTimeout_(cacheTimeout) +void +ResponseExpirationCache::Entry::put(boost::json::object response) { - for (auto const& command : cmds) { - cache_.emplace(command, std::make_unique()); - } + response_ = std::move(response); + lastUpdated_ = std::chrono::steady_clock::now(); +} + +std::optional +ResponseExpirationCache::Entry::get() const +{ + return response_; +} + +std::chrono::steady_clock::time_point +ResponseExpirationCache::Entry::lastUpdated() const +{ + return lastUpdated_; +} + +void +ResponseExpirationCache::Entry::invalidate() +{ + response_.reset(); } bool @@ -49,41 +62,38 @@ ResponseExpirationCache::shouldCache(std::string const& cmd) return cache_.contains(cmd); } -std::expected -ResponseExpirationCache::getOrUpdate( - boost::asio::yield_context yield, - std::string const& cmd, - Updater updater, - Verifier verifier -) +std::optional +ResponseExpirationCache::get(std::string const& cmd) const { auto it = cache_.find(cmd); - ASSERT(it != cache_.end(), "Can't get a value which is not in the cache"); + if (it == cache_.end()) + return std::nullopt; - auto& entry = it->second; - { - auto result = entry->asyncGet(yield, updater, verifier); - if (not result.has_value()) { - return std::unexpected{std::move(result).error()}; - } - if (std::chrono::steady_clock::now() - result->lastUpdated < cacheTimeout_) { - return std::move(result)->response; - } - } + auto const& entry = it->second.lock(); + if (std::chrono::steady_clock::now() - entry->lastUpdated() > cacheTimeout_) + return std::nullopt; - // Force update due to cache timeout - auto result = entry->update(yield, std::move(updater), std::move(verifier)); - if (not result.has_value()) { - return std::unexpected{std::move(result).error()}; - } - return std::move(result)->response; + return entry->get(); +} + +void +ResponseExpirationCache::put(std::string const& cmd, boost::json::object const& response) +{ + if (not shouldCache(cmd)) + return; + + ASSERT(cache_.contains(cmd), "Command is not in the cache: {}", cmd); + + auto entry = cache_[cmd].lock(); + entry->put(response); } void ResponseExpirationCache::invalidate() { for (auto& [_, entry] : cache_) { - entry->invalidate(); + auto entryLock = entry.lock(); + entryLock->invalidate(); } } diff --git a/src/util/ResponseExpirationCache.hpp b/src/util/ResponseExpirationCache.hpp index 61cbf6d8..35bcf241 100644 --- a/src/util/ResponseExpirationCache.hpp +++ b/src/util/ResponseExpirationCache.hpp @@ -16,18 +16,15 @@ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ //============================================================================== - #pragma once -#include "rpc/Errors.hpp" -#include "util/BlockingCache.hpp" +#include "util/Mutex.hpp" -#include -#include #include #include -#include +#include +#include #include #include #include @@ -36,89 +33,94 @@ namespace util { /** * @brief Cache of requests' responses with TTL support and configurable cacheable commands - * - * This class implements a time-based expiration cache for RPC responses. It allows - * caching responses for specified commands and automatically invalidates them after - * a configured timeout period. The cache uses BlockingCache internally to handle - * concurrent access and updates. */ class ResponseExpirationCache { -public: /** - * @brief A data structure to store a cache entry with its timestamp + * @brief A class to store a cache entry. */ - struct EntryData { - std::chrono::steady_clock::time_point lastUpdated; ///< When the entry was last updated - boost::json::object response; ///< The cached response data + class Entry { + std::chrono::steady_clock::time_point lastUpdated_; + std::optional response_; + + public: + /** + * @brief Put a response into the cache + * + * @param response The response to store + */ + void + put(boost::json::object response); + + /** + * @brief Get the response from the cache + * + * @return The response + */ + std::optional + get() const; + + /** + * @brief Get the last time the cache was updated + * + * @return The last time the cache was updated + */ + std::chrono::steady_clock::time_point + lastUpdated() const; + + /** + * @brief Invalidate the cache entry + */ + void + invalidate(); }; - /** - * @brief A data structure to represent errors that can occur during an update of the cache - */ - struct Error { - rpc::Status status; ///< The status code and message of the error - boost::json::array warnings; ///< Any warnings related to the request - - bool - operator==(Error const&) const = default; - }; - - using CacheEntry = util::BlockingCache; - -private: std::chrono::steady_clock::duration cacheTimeout_; - std::unordered_map> cache_; + std::unordered_map> cache_; + + bool + shouldCache(std::string const& cmd); public: /** - * @brief Construct a new ResponseExpirationCache object + * @brief Construct a new Cache object * - * @param cacheTimeout The time period after which cached entries expire - * @param cmds The commands that should be cached (requests for other commands won't be cached) + * @param cacheTimeout The time for cache entries to expire + * @param cmds The commands that should be cached */ ResponseExpirationCache( std::chrono::steady_clock::duration cacheTimeout, std::unordered_set const& cmds - ); + ) + : cacheTimeout_(cacheTimeout) + { + for (auto const& command : cmds) { + cache_.emplace(command, Entry{}); + } + } /** - * @brief Check if the given command should be cached + * @brief Get a response from the cache * - * @param cmd The command to check - * @return true if the command should be cached, false otherwise - */ - bool - shouldCache(std::string const& cmd); - - using Updater = CacheEntry::Updater; - using Verifier = CacheEntry::Verifier; - - /** - * @brief Get a cached response or update the cache if necessary - * - * This method returns a cached response if it exists and hasn't expired. - * If the cache entry is expired or doesn't exist, it calls the updater to - * generate a new value. If multiple coroutines request the same entry - * simultaneously, only one updater will be called while others wait. - * - * @note cmd must be one of the commands that are cached. There is an ASSERT() inside the function - * - * @param yield Asio yield context for coroutine suspension * @param cmd The command to get the response for - * @param updater Function to generate the response if not in cache or expired - * @param verifier Function to validate if a response should be cached - * @return The cached or newly generated response, or an error + * @return The response if it exists or std::nullopt otherwise */ - [[nodiscard]] std::expected - getOrUpdate(boost::asio::yield_context yield, std::string const& cmd, Updater updater, Verifier verifier); + [[nodiscard]] std::optional + get(std::string const& cmd) const; + + /** + * @brief Put a response into the cache if the request should be cached + * + * @param cmd The command to store the response for + * @param response The response to store + */ + void + put(std::string const& cmd, boost::json::object const& response); /** * @brief Invalidate all entries in the cache - * - * This causes all cached entries to be cleared, forcing the next access - * to generate new responses. */ void invalidate(); }; + } // namespace util diff --git a/tests/unit/util/ResponseExpirationCacheTests.cpp b/tests/unit/util/ResponseExpirationCacheTests.cpp index 0038f709..5970b2de 100644 --- a/tests/unit/util/ResponseExpirationCacheTests.cpp +++ b/tests/unit/util/ResponseExpirationCacheTests.cpp @@ -17,307 +17,53 @@ */ //============================================================================== -#include "rpc/Errors.hpp" -#include "util/AsioContextTestFixture.hpp" -#include "util/MockAssert.hpp" #include "util/ResponseExpirationCache.hpp" -#include #include -#include #include #include -#include #include -#include using namespace util; -using testing::MockFunction; -using testing::Return; -using testing::StrictMock; -struct ResponseExpirationCacheTest : SyncAsioContextTest { - using MockUpdater = StrictMock(boost::asio::yield_context)>>; - using MockVerifier = StrictMock>; - - std::string const cmd = "server_info"; - boost::json::object const obj = {{"some key", "some value"}}; - MockUpdater mockUpdater; - MockVerifier mockVerifier; +struct ResponseExpirationCacheTests : public ::testing::Test { +protected: + ResponseExpirationCache cache_{std::chrono::seconds{100}, {"key"}}; + boost::json::object object_{{"key", "value"}}; }; -TEST_F(ResponseExpirationCacheTest, ShouldCacheDeterminesIfCommandIsCacheable) +TEST_F(ResponseExpirationCacheTests, PutAndGetNotExpired) { - std::unordered_set const cmds = {cmd, "account_info"}; - ResponseExpirationCache cache{std::chrono::seconds(10), cmds}; + EXPECT_FALSE(cache_.get("key").has_value()); - for (auto const& c : cmds) { - EXPECT_TRUE(cache.shouldCache(c)); - } + cache_.put("key", object_); + auto result = cache_.get("key"); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(*result, object_); + result = cache_.get("key2"); + ASSERT_FALSE(result.has_value()); - EXPECT_FALSE(cache.shouldCache("account_tx")); - EXPECT_FALSE(cache.shouldCache("ledger")); - EXPECT_FALSE(cache.shouldCache("submit")); - EXPECT_FALSE(cache.shouldCache("")); + cache_.put("key2", object_); + result = cache_.get("key2"); + ASSERT_FALSE(result.has_value()); } -TEST_F(ResponseExpirationCacheTest, ShouldCacheEmptySetMeansNothingCacheable) +TEST_F(ResponseExpirationCacheTests, Invalidate) { - std::unordered_set const emptyCmds; - ResponseExpirationCache cache{std::chrono::seconds(10), emptyCmds}; - - EXPECT_FALSE(cache.shouldCache("server_info")); - EXPECT_FALSE(cache.shouldCache("account_info")); - EXPECT_FALSE(cache.shouldCache("any_command")); - EXPECT_FALSE(cache.shouldCache("")); + cache_.put("key", object_); + cache_.invalidate(); + EXPECT_FALSE(cache_.get("key").has_value()); } -TEST_F(ResponseExpirationCacheTest, ShouldCacheCaseMatchingIsRequired) +TEST_F(ResponseExpirationCacheTests, GetExpired) { - std::unordered_set const specificCmds = {cmd}; - ResponseExpirationCache cache{std::chrono::seconds(10), specificCmds}; + ResponseExpirationCache cache{std::chrono::milliseconds{1}, {"key"}}; + auto const response = boost::json::object{{"key", "value"}}; - EXPECT_TRUE(cache.shouldCache(cmd)); - EXPECT_FALSE(cache.shouldCache("SERVER_INFO")); - EXPECT_FALSE(cache.shouldCache("Server_Info")); -} - -TEST_F(ResponseExpirationCacheTest, GetOrUpdateNoValueInCacheCallsUpdaterAndVerifier) -{ - ResponseExpirationCache cache{std::chrono::seconds(10), {cmd}}; - - runSpawn([&](boost::asio::yield_context yield) { - EXPECT_CALL(mockUpdater, Call) - .WillOnce(Return( - ResponseExpirationCache::EntryData{ - .lastUpdated = std::chrono::steady_clock::now(), - .response = obj, - } - )); - EXPECT_CALL(mockVerifier, Call).WillOnce(Return(true)); - - auto result = - cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); - - ASSERT_TRUE(result.has_value()); - EXPECT_EQ(result.value(), obj); - }); -} - -TEST_F(ResponseExpirationCacheTest, GetOrUpdateExpiredValueInCacheCallsUpdaterAndVerifier) -{ - ResponseExpirationCache cache{std::chrono::milliseconds(1), {cmd}}; - - runSpawn([&](boost::asio::yield_context yield) { - boost::json::object const expiredObject = {{"some key", "expired value"}}; - EXPECT_CALL(mockUpdater, Call) - .WillOnce(Return( - ResponseExpirationCache::EntryData{ - .lastUpdated = std::chrono::steady_clock::now(), - .response = expiredObject, - } - )); - EXPECT_CALL(mockVerifier, Call).WillOnce(Return(true)); - - auto result = - cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); - - ASSERT_TRUE(result.has_value()); - EXPECT_EQ(result.value(), expiredObject); - - std::this_thread::sleep_for(std::chrono::milliseconds(2)); - - EXPECT_CALL(mockUpdater, Call) - .WillOnce(Return( - ResponseExpirationCache::EntryData{.lastUpdated = std::chrono::steady_clock::now(), .response = obj} - )); - EXPECT_CALL(mockVerifier, Call).WillOnce(Return(true)); - - result = cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); - - ASSERT_TRUE(result.has_value()); - EXPECT_EQ(result.value(), obj); - }); -} - -TEST_F(ResponseExpirationCacheTest, GetOrUpdateCachedValueNotExpiredDoesNotCallUpdaterOrVerifier) -{ - ResponseExpirationCache cache{std::chrono::seconds(10), {cmd}}; - - runSpawn([&](boost::asio::yield_context yield) { - // First call to populate cache - EXPECT_CALL(mockUpdater, Call) - .WillOnce(Return( - ResponseExpirationCache::EntryData{ - .lastUpdated = std::chrono::steady_clock::now(), - .response = obj, - } - )); - EXPECT_CALL(mockVerifier, Call).WillOnce(Return(true)); - - auto result = - cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); - - ASSERT_TRUE(result.has_value()); - EXPECT_EQ(result.value(), obj); - - // Second call should use cached value and not call updater/verifier - result = cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); - - ASSERT_TRUE(result.has_value()); - EXPECT_EQ(result.value(), obj); - }); -} - -TEST_F(ResponseExpirationCacheTest, GetOrUpdateHandlesErrorFromUpdater) -{ - ResponseExpirationCache cache{std::chrono::seconds(10), {cmd}}; - - ResponseExpirationCache::Error const error{ - .status = rpc::Status{rpc::ClioError::EtlConnectionError}, .warnings = {} - }; - - runSpawn([&](boost::asio::yield_context yield) { - EXPECT_CALL(mockUpdater, Call).WillOnce(Return(std::unexpected(error))); - - auto result = - cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); - - ASSERT_FALSE(result.has_value()); - EXPECT_EQ(result.error(), error); - }); -} - -TEST_F(ResponseExpirationCacheTest, GetOrUpdateVerifierRejection) -{ - ResponseExpirationCache cache{std::chrono::seconds(10), {cmd}}; - - runSpawn([&](boost::asio::yield_context yield) { - EXPECT_CALL(mockUpdater, Call) - .WillOnce(Return( - ResponseExpirationCache::EntryData{ - .lastUpdated = std::chrono::steady_clock::now(), - .response = obj, - } - )); - EXPECT_CALL(mockVerifier, Call).WillOnce(Return(false)); - - auto result = - cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); - - ASSERT_TRUE(result.has_value()); - EXPECT_EQ(result.value(), obj); - - boost::json::object const anotherObj = {{"some key", "another value"}}; - EXPECT_CALL(mockUpdater, Call) - .WillOnce(Return( - ResponseExpirationCache::EntryData{ - .lastUpdated = std::chrono::steady_clock::now(), - .response = anotherObj, - } - )); - EXPECT_CALL(mockVerifier, Call).WillOnce(Return(true)); - - result = cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); - - ASSERT_TRUE(result.has_value()); - EXPECT_EQ(result.value(), anotherObj); - }); -} - -TEST_F(ResponseExpirationCacheTest, GetOrUpdateMultipleConcurrentUpdates) -{ - ResponseExpirationCache cache{std::chrono::seconds(10), {cmd}}; - bool waitingCoroutineFinished = false; - - auto waitingCoroutine = [&](boost::asio::yield_context yield) { - auto result = - cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); - - ASSERT_TRUE(result.has_value()); - EXPECT_EQ(result.value(), obj); - waitingCoroutineFinished = true; - }; - - EXPECT_CALL(mockUpdater, Call) - .WillOnce( - [this, &waitingCoroutine]( - boost::asio::yield_context yield - ) -> std::expected { - boost::asio::spawn(yield, waitingCoroutine); - return ResponseExpirationCache::EntryData{ - .lastUpdated = std::chrono::steady_clock::now(), - .response = obj, - }; - } - ); - EXPECT_CALL(mockVerifier, Call).WillOnce(Return(true)); - - runSpawnWithTimeout(std::chrono::seconds{1}, [&](boost::asio::yield_context yield) { - auto result = - cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); - - ASSERT_TRUE(result.has_value()); - EXPECT_EQ(result.value(), obj); - ASSERT_FALSE(waitingCoroutineFinished); - }); -} - -TEST_F(ResponseExpirationCacheTest, InvalidateForcesRefresh) -{ - ResponseExpirationCache cache{std::chrono::seconds(10), {cmd}}; - - runSpawn([&](boost::asio::yield_context yield) { - boost::json::object const oldObject = {{"some key", "old value"}}; - EXPECT_CALL(mockUpdater, Call) - .WillOnce(Return( - ResponseExpirationCache::EntryData{ - .lastUpdated = std::chrono::steady_clock::now(), - .response = oldObject, - } - )); - EXPECT_CALL(mockVerifier, Call).WillOnce(Return(true)); - - auto result = - cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); - - ASSERT_TRUE(result.has_value()); - EXPECT_EQ(result.value(), oldObject); - - cache.invalidate(); - - EXPECT_CALL(mockUpdater, Call) - .WillOnce(Return( - ResponseExpirationCache::EntryData{ - .lastUpdated = std::chrono::steady_clock::now(), - .response = obj, - } - )); - EXPECT_CALL(mockVerifier, Call).WillOnce(Return(true)); - - result = cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()); - - ASSERT_TRUE(result.has_value()); - EXPECT_EQ(result.value(), obj); - }); -} - -struct ResponseExpirationCacheAssertTest : common::util::WithMockAssert, ResponseExpirationCacheTest {}; - -TEST_F(ResponseExpirationCacheAssertTest, NonCacheableCommandThrowsAssertion) -{ - ResponseExpirationCache cache{std::chrono::seconds(10), {cmd}}; - - ASSERT_FALSE(cache.shouldCache("non_cacheable_command")); - - runSpawn([&](boost::asio::yield_context yield) { - EXPECT_CLIO_ASSERT_FAIL({ - [[maybe_unused]] - auto const v = cache.getOrUpdate( - yield, "non_cacheable_command", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction() - ); - }); - }); + cache.put("key", response); + std::this_thread::sleep_for(std::chrono::milliseconds{2}); + + auto const result = cache.get("key"); + EXPECT_FALSE(result); }