feat: More efficient cache (#1997)

Fixes #1473.
This commit is contained in:
Sergey Kuznetsov
2025-04-17 16:44:53 +01:00
committed by GitHub
parent 39d1ceace4
commit 46514c8fe9
16 changed files with 1092 additions and 260 deletions

View File

@@ -54,6 +54,7 @@
#include <string>
#include <thread>
#include <utility>
#include <variant>
#include <vector>
using namespace util::config;
@@ -227,7 +228,7 @@ LoadBalancer::fetchLedger(
return response;
}
std::expected<boost::json::object, rpc::ClioError>
std::expected<boost::json::object, rpc::CombinedError>
LoadBalancer::forwardToRippled(
boost::json::object const& request,
std::optional<std::string> const& clientIp,
@@ -239,40 +240,37 @@ LoadBalancer::forwardToRippled(
return std::unexpected{rpc::ClioError::RpcCommandIsMissing};
auto const cmd = boost::json::value_to<std::string>(request.at("command"));
if (forwardingCache_) {
if (auto cachedResponse = forwardingCache_->get(cmd); cachedResponse) {
return std::move(cachedResponse).value();
if (forwardingCache_ and forwardingCache_->shouldCache(cmd)) {
auto updater =
[this, &request, &clientIp, isAdmin](boost::asio::yield_context yield
) -> std::expected<util::ResponseExpirationCache::EntryData, util::ResponseExpirationCache::Error> {
auto result = forwardToRippledImpl(request, clientIp, isAdmin, yield);
if (result.has_value()) {
return util::ResponseExpirationCache::EntryData{
.lastUpdated = std::chrono::steady_clock::now(), .response = std::move(result).value()
};
}
return std::unexpected{
util::ResponseExpirationCache::Error{.status = rpc::Status{result.error()}, .warnings = {}}
};
};
auto result = forwardingCache_->getOrUpdate(
yield,
cmd,
std::move(updater),
[](util::ResponseExpirationCache::EntryData const& entry) { return not entry.response.contains("error"); }
);
if (result.has_value()) {
return std::move(result).value();
}
auto const combinedError = result.error().status.code;
ASSERT(std::holds_alternative<rpc::ClioError>(combinedError), "There could be only ClioError here");
return std::unexpected{std::get<rpc::ClioError>(combinedError)};
}
ASSERT(not sources_.empty(), "ETL sources must be configured to forward requests.");
std::size_t sourceIdx = util::Random::uniform(0ul, sources_.size() - 1);
auto numAttempts = 0u;
auto xUserValue = isAdmin ? kADMIN_FORWARDING_X_USER_VALUE : kUSER_FORWARDING_X_USER_VALUE;
std::optional<boost::json::object> response;
rpc::ClioError error = rpc::ClioError::EtlConnectionError;
while (numAttempts < sources_.size()) {
auto res = sources_[sourceIdx]->forwardToRippled(request, clientIp, xUserValue, yield);
if (res) {
response = std::move(res).value();
break;
}
error = std::max(error, res.error()); // Choose the best result between all sources
sourceIdx = (sourceIdx + 1) % sources_.size();
++numAttempts;
}
if (response) {
if (forwardingCache_ and not response->contains("error"))
forwardingCache_->put(cmd, *response);
return std::move(response).value();
}
return std::unexpected{error};
return forwardToRippledImpl(request, clientIp, isAdmin, yield);
}
boost::json::value
@@ -363,4 +361,40 @@ LoadBalancer::chooseForwardingSource()
}
}
std::expected<boost::json::object, rpc::CombinedError>
LoadBalancer::forwardToRippledImpl(
boost::json::object const& request,
std::optional<std::string> const& clientIp,
bool const isAdmin,
boost::asio::yield_context yield
)
{
ASSERT(not sources_.empty(), "ETL sources must be configured to forward requests.");
std::size_t sourceIdx = util::Random::uniform(0ul, sources_.size() - 1);
auto numAttempts = 0u;
auto xUserValue = isAdmin ? kADMIN_FORWARDING_X_USER_VALUE : kUSER_FORWARDING_X_USER_VALUE;
std::optional<boost::json::object> response;
rpc::ClioError error = rpc::ClioError::EtlConnectionError;
while (numAttempts < sources_.size()) {
auto res = sources_[sourceIdx]->forwardToRippled(request, clientIp, xUserValue, yield);
if (res) {
response = std::move(res).value();
break;
}
error = std::max(error, res.error()); // Choose the best result between all sources
sourceIdx = (sourceIdx + 1) % sources_.size();
++numAttempts;
}
if (response.has_value()) {
return std::move(response).value();
}
return std::unexpected{error};
}
} // namespace etl

View File

@@ -44,6 +44,7 @@
#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
#include <chrono>
#include <concepts>
#include <cstdint>
#include <expected>
#include <memory>
@@ -218,7 +219,7 @@ public:
* @param yield The coroutine context
* @return Response received from rippled node as JSON object on success or error on failure
*/
std::expected<boost::json::object, rpc::ClioError>
std::expected<boost::json::object, rpc::CombinedError>
forwardToRippled(
boost::json::object const& request,
std::optional<std::string> const& clientIp,
@@ -264,6 +265,14 @@ private:
*/
void
chooseForwardingSource();
std::expected<boost::json::object, rpc::CombinedError>
forwardToRippledImpl(
boost::json::object const& request,
std::optional<std::string> const& clientIp,
bool isAdmin,
boost::asio::yield_context yield
);
};
} // namespace etl

View File

@@ -55,6 +55,7 @@
#include <string>
#include <thread>
#include <utility>
#include <variant>
#include <vector>
using namespace util::config;
@@ -231,7 +232,7 @@ LoadBalancer::fetchLedger(
return response;
}
std::expected<boost::json::object, rpc::ClioError>
std::expected<boost::json::object, rpc::CombinedError>
LoadBalancer::forwardToRippled(
boost::json::object const& request,
std::optional<std::string> const& clientIp,
@@ -243,40 +244,37 @@ LoadBalancer::forwardToRippled(
return std::unexpected{rpc::ClioError::RpcCommandIsMissing};
auto const cmd = boost::json::value_to<std::string>(request.at("command"));
if (forwardingCache_) {
if (auto cachedResponse = forwardingCache_->get(cmd); cachedResponse) {
return std::move(cachedResponse).value();
if (forwardingCache_ and forwardingCache_->shouldCache(cmd)) {
auto updater =
[this, &request, &clientIp, isAdmin](boost::asio::yield_context yield
) -> std::expected<util::ResponseExpirationCache::EntryData, util::ResponseExpirationCache::Error> {
auto result = forwardToRippledImpl(request, clientIp, isAdmin, yield);
if (result.has_value()) {
return util::ResponseExpirationCache::EntryData{
.lastUpdated = std::chrono::steady_clock::now(), .response = std::move(result).value()
};
}
return std::unexpected{
util::ResponseExpirationCache::Error{.status = rpc::Status{result.error()}, .warnings = {}}
};
};
auto result = forwardingCache_->getOrUpdate(
yield,
cmd,
std::move(updater),
[](util::ResponseExpirationCache::EntryData const& entry) { return not entry.response.contains("error"); }
);
if (result.has_value()) {
return std::move(result).value();
}
auto const combinedError = result.error().status.code;
ASSERT(std::holds_alternative<rpc::ClioError>(combinedError), "There could be only ClioError here");
return std::unexpected{std::get<rpc::ClioError>(combinedError)};
}
ASSERT(not sources_.empty(), "ETL sources must be configured to forward requests.");
std::size_t sourceIdx = util::Random::uniform(0ul, sources_.size() - 1);
auto numAttempts = 0u;
auto xUserValue = isAdmin ? kADMIN_FORWARDING_X_USER_VALUE : kUSER_FORWARDING_X_USER_VALUE;
std::optional<boost::json::object> response;
rpc::ClioError error = rpc::ClioError::EtlConnectionError;
while (numAttempts < sources_.size()) {
auto res = sources_[sourceIdx]->forwardToRippled(request, clientIp, xUserValue, yield);
if (res) {
response = std::move(res).value();
break;
}
error = std::max(error, res.error()); // Choose the best result between all sources
sourceIdx = (sourceIdx + 1) % sources_.size();
++numAttempts;
}
if (response) {
if (forwardingCache_ and not response->contains("error"))
forwardingCache_->put(cmd, *response);
return std::move(response).value();
}
return std::unexpected{error};
return forwardToRippledImpl(request, clientIp, isAdmin, yield);
}
boost::json::value
@@ -367,4 +365,40 @@ LoadBalancer::chooseForwardingSource()
}
}
std::expected<boost::json::object, rpc::CombinedError>
LoadBalancer::forwardToRippledImpl(
boost::json::object const& request,
std::optional<std::string> const& clientIp,
bool isAdmin,
boost::asio::yield_context yield
)
{
ASSERT(not sources_.empty(), "ETL sources must be configured to forward requests.");
std::size_t sourceIdx = util::Random::uniform(0ul, sources_.size() - 1);
auto numAttempts = 0u;
auto xUserValue = isAdmin ? kADMIN_FORWARDING_X_USER_VALUE : kUSER_FORWARDING_X_USER_VALUE;
std::optional<boost::json::object> response;
rpc::ClioError error = rpc::ClioError::EtlConnectionError;
while (numAttempts < sources_.size()) {
auto res = sources_[sourceIdx]->forwardToRippled(request, clientIp, xUserValue, yield);
if (res) {
response = std::move(res).value();
break;
}
error = std::max(error, res.error()); // Choose the best result between all sources
sourceIdx = (sourceIdx + 1) % sources_.size();
++numAttempts;
}
if (response.has_value()) {
return std::move(response).value();
}
return std::unexpected{error};
}
} // namespace etlng

View File

@@ -44,6 +44,7 @@
#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
#include <chrono>
#include <concepts>
#include <cstdint>
#include <expected>
#include <memory>
@@ -220,7 +221,7 @@ public:
* @param yield The coroutine context
* @return Response received from rippled node as JSON object on success or error on failure
*/
std::expected<boost::json::object, rpc::ClioError>
std::expected<boost::json::object, rpc::CombinedError>
forwardToRippled(
boost::json::object const& request,
std::optional<std::string> const& clientIp,
@@ -266,6 +267,14 @@ private:
*/
void
chooseForwardingSource();
std::expected<boost::json::object, rpc::CombinedError>
forwardToRippledImpl(
boost::json::object const& request,
std::optional<std::string> const& clientIp,
bool isAdmin,
boost::asio::yield_context yield
);
};
} // namespace etlng

View File

@@ -115,7 +115,7 @@ public:
* @param yield The coroutine context
* @return Response received from rippled node as JSON object on success or error on failure
*/
virtual std::expected<boost::json::object, rpc::ClioError>
virtual std::expected<boost::json::object, rpc::CombinedError>
forwardToRippled(
boost::json::object const& request,
std::optional<std::string> const& clientIp,

View File

@@ -27,6 +27,7 @@
#include "rpc/common/HandlerProvider.hpp"
#include "rpc/common/Types.hpp"
#include "rpc/common/impl/ForwardingProxy.hpp"
#include "util/OverloadSet.hpp"
#include "util/ResponseExpirationCache.hpp"
#include "util/log/Logger.hpp"
#include "web/Context.hpp"
@@ -35,6 +36,7 @@
#include <boost/asio/spawn.hpp>
#include <boost/iterator/transform_iterator.hpp>
#include <boost/json.hpp>
#include <boost/json/object.hpp>
#include <fmt/core.h>
#include <fmt/format.h>
#include <xrpl/protocol/ErrorCodes.h>
@@ -156,55 +158,51 @@ public:
return forwardingProxy_.forward(ctx);
}
if (not ctx.isAdmin and responseCache_) {
if (auto res = responseCache_->get(ctx.method); res.has_value())
return Result{std::move(res).value()};
}
if (backend_->isTooBusy()) {
LOG(log_.error()) << "Database is too busy. Rejecting request";
notifyTooBusy(); // TODO: should we add ctx.method if we have it?
return Result{Status{RippledError::rpcTOO_BUSY}};
}
auto const method = handlerProvider_->getHandler(ctx.method);
if (!method) {
notifyUnknownCommand();
return Result{Status{RippledError::rpcUNKNOWN_COMMAND}};
}
try {
LOG(perfLog_.debug()) << ctx.tag() << " start executing rpc `" << ctx.method << '`';
auto const context = Context{
.yield = ctx.yield,
.session = ctx.session,
.isAdmin = ctx.isAdmin,
.clientIp = ctx.clientIp,
.apiVersion = ctx.apiVersion
if (not ctx.isAdmin and responseCache_ and responseCache_->shouldCache(ctx.method)) {
auto updater =
[this, &ctx](boost::asio::yield_context
) -> std::expected<util::ResponseExpirationCache::EntryData, util::ResponseExpirationCache::Error> {
auto result = buildResponseImpl(ctx);
auto extracted = std::visit(
util::OverloadSet{
[&result](Status status
) -> std::expected<boost::json::object, util::ResponseExpirationCache::Error> {
return std::unexpected{util::ResponseExpirationCache::Error{
.status = std::move(status), .warnings = std::move(result.warnings)
}};
},
[](boost::json::object obj
) -> std::expected<boost::json::object, util::ResponseExpirationCache::Error> { return obj; }
},
std::move(result.response)
);
if (extracted.has_value()) {
return util::ResponseExpirationCache::EntryData{
.lastUpdated = std::chrono::steady_clock::now(), .response = std::move(extracted).value()
};
}
return std::unexpected{std::move(extracted).error()};
};
auto v = (*method).process(ctx.params, context);
LOG(perfLog_.debug()) << ctx.tag() << " finish executing rpc `" << ctx.method << '`';
if (not v) {
notifyErrored(ctx.method);
} else if (not ctx.isAdmin and responseCache_) {
responseCache_->put(ctx.method, v.result->as_object());
auto result = responseCache_->getOrUpdate(
ctx.yield,
ctx.method,
std::move(updater),
[&ctx](util::ResponseExpirationCache::EntryData const& entry) {
return not ctx.isAdmin and not entry.response.contains("error");
}
);
if (result.has_value()) {
return Result{std::move(result).value()};
}
return Result{std::move(v)};
} catch (data::DatabaseTimeout const& t) {
LOG(log_.error()) << "Database timeout";
notifyTooBusy();
return Result{Status{RippledError::rpcTOO_BUSY}};
} catch (std::exception const& ex) {
LOG(log_.error()) << ctx.tag() << "Caught exception: " << ex.what();
notifyInternalError();
return Result{Status{RippledError::rpcINTERNAL}};
auto error = std::move(result).error();
Result errorResult{std::move(error.status)};
errorResult.warnings = std::move(error.warnings);
return errorResult;
}
return buildResponseImpl(ctx);
}
/**
@@ -317,6 +315,53 @@ private:
{
return handlerProvider_->contains(method) || forwardingProxy_.isProxied(method);
}
Result
buildResponseImpl(web::Context const& ctx)
{
if (backend_->isTooBusy()) {
LOG(log_.error()) << "Database is too busy. Rejecting request";
notifyTooBusy(); // TODO: should we add ctx.method if we have it?
return Result{Status{RippledError::rpcTOO_BUSY}};
}
auto const method = handlerProvider_->getHandler(ctx.method);
if (!method) {
notifyUnknownCommand();
return Result{Status{RippledError::rpcUNKNOWN_COMMAND}};
}
try {
LOG(perfLog_.debug()) << ctx.tag() << " start executing rpc `" << ctx.method << '`';
auto const context = Context{
.yield = ctx.yield,
.session = ctx.session,
.isAdmin = ctx.isAdmin,
.clientIp = ctx.clientIp,
.apiVersion = ctx.apiVersion
};
auto v = (*method).process(ctx.params, context);
LOG(perfLog_.debug()) << ctx.tag() << " finish executing rpc `" << ctx.method << '`';
if (not v) {
notifyErrored(ctx.method);
}
return Result{std::move(v)};
} catch (data::DatabaseTimeout const& t) {
LOG(log_.error()) << "Database timeout";
notifyTooBusy();
return Result{Status{RippledError::rpcTOO_BUSY}};
} catch (std::exception const& ex) {
LOG(log_.error()) << ctx.tag() << "Caught exception: " << ex.what();
notifyInternalError();
return Result{Status{RippledError::rpcINTERNAL}};
}
}
};
} // namespace rpc

221
src/util/BlockingCache.hpp Normal file
View File

@@ -0,0 +1,221 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "util/Assert.hpp"
#include "util/Mutex.hpp"
#include <boost/asio/error.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/signals2/connection.hpp>
#include <boost/signals2/signal.hpp>
#include <boost/signals2/variadic_signal.hpp>
#include <atomic>
#include <concepts>
#include <expected>
#include <functional>
#include <optional>
#include <shared_mutex>
#include <utility>
namespace util {
/**
* @brief A thread-safe cache that blocks getting operations until the cache is updated
*
* @tparam ValueType The type of value to be cached
* @tparam ErrorType The type of error that can occur during updates
*/
template <typename ValueType, typename ErrorType>
requires(not std::same_as<ValueType, ErrorType>)
class BlockingCache {
public:
/**
* @brief Possible states of the cache
*/
enum class State { NoValue, Updating, HasValue };
private:
std::atomic<State> state_{State::NoValue};
util::Mutex<std::optional<ValueType>, std::shared_mutex> value_;
boost::signals2::signal<void(std::expected<ValueType, ErrorType>)> updateFinished_;
public:
/**
* @brief Default constructor - creates an empty cache
*/
BlockingCache() = default;
/**
* @brief Construct a cache with an initial value
* @param initialValue The value to initialize the cache with
*/
explicit BlockingCache(ValueType initialValue) : state_{State::HasValue}, value_(std::move(initialValue))
{
}
BlockingCache(BlockingCache&&) = delete;
BlockingCache(BlockingCache const&) = delete;
BlockingCache&
operator=(BlockingCache&&) = delete;
BlockingCache&
operator=(BlockingCache const&) = delete;
/**
* @brief Function type for cache update operations
* @details Called when the cache needs to be populated or refreshed
*/
using Updater = std::function<std::expected<ValueType, ErrorType>(boost::asio::yield_context)>;
/**
* @brief Function type to verify if a value should be cached
* @details Returns true if the value should be stored in the cache
*/
using Verifier = std::function<bool(ValueType const&)>;
/**
* @brief Asynchronously get a value from the cache, updating if necessary
*
* @param yield The asio yield context for coroutine suspension
* @param updater Function to generate a new value if needed
* @param verifier Function to validate whether a value should be cached
* @return std::expected<ValueType, ErrorType> The cached value or an error
*
* Depending on the current cache state, this will either:
* - Return the cached value if it's already present
* - Wait for an ongoing update to complete
* - Trigger a new update if the cache is empty
*/
[[nodiscard]] std::expected<ValueType, ErrorType>
asyncGet(boost::asio::yield_context yield, Updater updater, Verifier verifier)
{
switch (state_) {
case State::Updating: {
return wait(yield, std::move(updater), std::move(verifier));
}
case State::HasValue: {
auto const value = value_.template lock<std::shared_lock>();
ASSERT(value->has_value(), "Value should be presented when the cache is full");
return value->value();
}
case State::NoValue: {
return update(yield, std::move(updater), std::move(verifier));
}
};
std::unreachable();
}
/**
* @brief Force an update of the cache value
*
* @param yield The ASIO yield context for coroutine suspension
* @param updater Function to generate a new value
* @param verifier Function to validate whether a value should be cached
* @return std::expected<ValueType, ErrorType> The new value or an error
*
* Initiates a cache update operation regardless of current state.
* If another update is already in progress, waits for it to complete.
*/
[[nodiscard]] std::expected<ValueType, ErrorType>
update(boost::asio::yield_context yield, Updater updater, Verifier verifier)
{
if (state_ == State::Updating) {
return asyncGet(yield, std::move(updater), std::move(verifier));
}
state_ = State::Updating;
auto const result = updater(yield);
auto const shouldBeCached = result.has_value() and verifier(result.value());
if (shouldBeCached) {
value_.lock().get() = result.value();
state_ = State::HasValue;
} else {
state_ = State::NoValue;
value_.lock().get() = std::nullopt;
}
updateFinished_(result);
return result;
}
/**
* @brief Invalidates the currently cached value if present
*
* Clears the cache and sets its state to Empty.
* Has no effect if the cache is already empty or being updated.
*/
void
invalidate()
{
if (state_ == State::HasValue) {
state_ = State::NoValue;
value_.lock().get() = std::nullopt;
}
}
/**
* @brief Returns the current state of the cache
* @return Current cache state (Empty, Updating, or Full)
*/
[[nodiscard]] State
state() const
{
return state_;
}
private:
/**
* @brief Wait for an ongoing update to complete
*
* @param yield The ASIO yield context for coroutine suspension
* @param updater Function to generate a new value if needed
* @param verifier Function to validate whether a value should be cached
* @return std::expected<ValueType, ErrorType> The result of the ongoing update
*
* This method blocks the current coroutine until the ongoing update signals completion.
*/
std::expected<ValueType, ErrorType>
wait(boost::asio::yield_context yield, Updater updater, Verifier verifier)
{
boost::asio::steady_timer timer{yield.get_executor(), boost::asio::steady_timer::duration::max()};
boost::system::error_code errorCode;
std::optional<std::expected<ValueType, ErrorType>> result;
boost::signals2::scoped_connection slot =
updateFinished_.connect([yield, &timer, &result](std::expected<ValueType, ErrorType> value) {
boost::asio::spawn(yield, [&timer, &result, value = std::move(value)](auto&&) {
result = std::move(value);
timer.cancel();
});
});
if (state_ == State::Updating) {
timer.async_wait(yield[errorCode]);
ASSERT(result.has_value(), "There should be some value after waiting");
return std::move(result).value();
}
return asyncGet(yield, std::move(updater), std::move(verifier));
}
};
} // namespace util

View File

@@ -21,40 +21,26 @@
#include "util/Assert.hpp"
#include <boost/asio/spawn.hpp>
#include <boost/json/object.hpp>
#include <chrono>
#include <mutex>
#include <optional>
#include <shared_mutex>
#include <memory>
#include <string>
#include <unordered_set>
#include <utility>
namespace util {
void
ResponseExpirationCache::Entry::put(boost::json::object response)
ResponseExpirationCache::ResponseExpirationCache(
std::chrono::steady_clock::duration cacheTimeout,
std::unordered_set<std::string> const& cmds
)
: cacheTimeout_(cacheTimeout)
{
response_ = std::move(response);
lastUpdated_ = std::chrono::steady_clock::now();
}
std::optional<boost::json::object>
ResponseExpirationCache::Entry::get() const
{
return response_;
}
std::chrono::steady_clock::time_point
ResponseExpirationCache::Entry::lastUpdated() const
{
return lastUpdated_;
}
void
ResponseExpirationCache::Entry::invalidate()
{
response_.reset();
for (auto const& command : cmds) {
cache_.emplace(command, std::make_unique<CacheEntry>());
}
}
bool
@@ -63,38 +49,41 @@ ResponseExpirationCache::shouldCache(std::string const& cmd)
return cache_.contains(cmd);
}
std::optional<boost::json::object>
ResponseExpirationCache::get(std::string const& cmd) const
std::expected<boost::json::object, ResponseExpirationCache::Error>
ResponseExpirationCache::getOrUpdate(
boost::asio::yield_context yield,
std::string const& cmd,
Updater updater,
Verifier verifier
)
{
auto it = cache_.find(cmd);
if (it == cache_.end())
return std::nullopt;
ASSERT(it != cache_.end(), "Can't get a value which is not in the cache");
auto const& entry = it->second.lock<std::shared_lock>();
if (std::chrono::steady_clock::now() - entry->lastUpdated() > cacheTimeout_)
return std::nullopt;
auto& entry = it->second;
{
auto result = entry->asyncGet(yield, updater, verifier);
if (not result.has_value()) {
return std::unexpected{std::move(result).error()};
}
if (std::chrono::steady_clock::now() - result->lastUpdated < cacheTimeout_) {
return std::move(result)->response;
}
}
return entry->get();
}
void
ResponseExpirationCache::put(std::string const& cmd, boost::json::object const& response)
{
if (not shouldCache(cmd))
return;
ASSERT(cache_.contains(cmd), "Command is not in the cache: {}", cmd);
auto entry = cache_[cmd].lock<std::unique_lock>();
entry->put(response);
// Force update due to cache timeout
auto result = entry->update(yield, std::move(updater), std::move(verifier));
if (not result.has_value()) {
return std::unexpected{std::move(result).error()};
}
return std::move(result)->response;
}
void
ResponseExpirationCache::invalidate()
{
for (auto& [_, entry] : cache_) {
auto entryLock = entry.lock<std::unique_lock>();
entryLock->invalidate();
entry->invalidate();
}
}

View File

@@ -19,13 +19,15 @@
#pragma once
#include "util/Mutex.hpp"
#include "rpc/Errors.hpp"
#include "util/BlockingCache.hpp"
#include <boost/asio/spawn.hpp>
#include <boost/json/array.hpp>
#include <boost/json/object.hpp>
#include <chrono>
#include <optional>
#include <shared_mutex>
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
@@ -34,91 +36,87 @@ namespace util {
/**
* @brief Cache of requests' responses with TTL support and configurable cachable commands
*
* This class implements a time-based expiration cache for RPC responses. It allows
* caching responses for specified commands and automatically invalidates them after
* a configured timeout period. The cache uses BlockingCache internally to handle
* concurrent access and updates.
*/
class ResponseExpirationCache {
public:
/**
* @brief A class to store a cache entry.
* @brief A data structure to store a cache entry with its timestamp
*/
class Entry {
std::chrono::steady_clock::time_point lastUpdated_;
std::optional<boost::json::object> response_;
public:
/**
* @brief Put a response into the cache
*
* @param response The response to store
*/
void
put(boost::json::object response);
/**
* @brief Get the response from the cache
*
* @return The response
*/
std::optional<boost::json::object>
get() const;
/**
* @brief Get the last time the cache was updated
*
* @return The last time the cache was updated
*/
std::chrono::steady_clock::time_point
lastUpdated() const;
/**
* @brief Invalidate the cache entry
*/
void
invalidate();
struct EntryData {
std::chrono::steady_clock::time_point lastUpdated; ///< When the entry was last updated
boost::json::object response; ///< The cached response data
};
std::chrono::steady_clock::duration cacheTimeout_;
std::unordered_map<std::string, util::Mutex<Entry, std::shared_mutex>> cache_;
/**
* @brief A data structure to represent errors that can occur during an update of the cache
*/
struct Error {
rpc::Status status; ///< The status code and message of the error
boost::json::array warnings; ///< Any warnings related to the request
bool
shouldCache(std::string const& cmd);
bool
operator==(Error const&) const = default;
};
using CacheEntry = util::BlockingCache<EntryData, Error>;
private:
std::chrono::steady_clock::duration cacheTimeout_;
std::unordered_map<std::string, std::unique_ptr<CacheEntry>> cache_;
public:
/**
* @brief Construct a new Cache object
* @brief Construct a new ResponseExpirationCache object
*
* @param cacheTimeout The time for cache entries to expire
* @param cmds The commands that should be cached
* @param cacheTimeout The time period after which cached entries expire
* @param cmds The commands that should be cached (requests for other commands won't be cached)
*/
ResponseExpirationCache(
std::chrono::steady_clock::duration cacheTimeout,
std::unordered_set<std::string> const& cmds
)
: cacheTimeout_(cacheTimeout)
{
for (auto const& command : cmds) {
cache_.emplace(command, Entry{});
}
}
);
/**
* @brief Get a response from the cache
* @brief Check if the given command should be cached
*
* @param cmd The command to check
* @return true if the command should be cached, false otherwise
*/
bool
shouldCache(std::string const& cmd);
using Updater = CacheEntry::Updater;
using Verifier = CacheEntry::Verifier;
/**
* @brief Get a cached response or update the cache if necessary
*
* This method returns a cached response if it exists and hasn't expired.
* If the cache entry is expired or doesn't exist, it calls the updater to
* generate a new value. If multiple coroutines request the same entry
* simultaneously, only one updater will be called while others wait.
*
* @note cmd must be one of the commands that are cached. There is an ASSERT() inside the function
*
* @param yield Asio yield context for coroutine suspension
* @param cmd The command to get the response for
* @return The response if it exists or std::nullopt otherwise
* @param updater Function to generate the response if not in cache or expired
* @param verifier Function to validate if a response should be cached
* @return The cached or newly generated response, or an error
*/
[[nodiscard]] std::optional<boost::json::object>
get(std::string const& cmd) const;
/**
* @brief Put a response into the cache if the request should be cached
*
* @param cmd The command to store the response for
* @param response The response to store
*/
void
put(std::string const& cmd, boost::json::object const& response);
[[nodiscard]] std::expected<boost::json::object, Error>
getOrUpdate(boost::asio::yield_context yield, std::string const& cmd, Updater updater, Verifier verifier);
/**
* @brief Invalidate all entries in the cache
*
* This causes all cached entries to be cleared, forcing the next access
* to generate new responses.
*/
void
invalidate();

View File

@@ -62,7 +62,7 @@ struct MockNgLoadBalancer : etlng::LoadBalancerInterface {
MOCK_METHOD(boost::json::value, toJson, (), (const, override));
MOCK_METHOD(std::optional<etl::ETLState>, getETLState, (), (noexcept, override));
using ForwardToRippledReturnType = std::expected<boost::json::object, rpc::ClioError>;
using ForwardToRippledReturnType = std::expected<boost::json::object, rpc::CombinedError>;
MOCK_METHOD(
ForwardToRippledReturnType,
forwardToRippled,

View File

@@ -140,6 +140,7 @@ target_sources(
util/async/AnyStrandTests.cpp
util/async/AsyncExecutionContextTests.cpp
util/BatchingTests.cpp
util/BlockingCacheTests.cpp
util/ConceptsTests.cpp
util/CoroutineGroupTests.cpp
util/LedgerUtilsTests.cpp

View File

@@ -645,7 +645,7 @@ struct LoadBalancerForwardToRippledErrorTestBundle {
std::string testName;
rpc::ClioError firstSourceError;
rpc::ClioError secondSourceError;
rpc::ClioError responseExpectedError;
rpc::CombinedError responseExpectedError;
};
struct LoadBalancerForwardToRippledErrorTests
@@ -776,7 +776,7 @@ TEST_F(LoadBalancerForwardToRippledTests, commandLineMissing)
runSpawn([&](boost::asio::yield_context yield) {
EXPECT_EQ(
loadBalancer->forwardToRippled(request, clientIP_, false, yield).error(),
rpc::ClioError::RpcCommandIsMissing
rpc::CombinedError{rpc::ClioError::RpcCommandIsMissing}
);
});
}

View File

@@ -672,7 +672,7 @@ struct LoadBalancerForwardToRippledErrorNgTestBundle {
std::string testName;
rpc::ClioError firstSourceError;
rpc::ClioError secondSourceError;
rpc::ClioError responseExpectedError;
rpc::CombinedError responseExpectedError;
};
struct LoadBalancerForwardToRippledErrorNgTests
@@ -803,7 +803,7 @@ TEST_F(LoadBalancerForwardToRippledNgTests, commandLineMissing)
runSpawn([&](boost::asio::yield_context yield) {
EXPECT_EQ(
loadBalancer->forwardToRippled(request, clientIP_, false, yield).error(),
rpc::ClioError::RpcCommandIsMissing
rpc::CombinedError{rpc::ClioError::RpcCommandIsMissing}
);
});
}

View File

@@ -262,7 +262,7 @@ TEST_P(RPCEngineFlowParameterTest, Test)
if (testBundle.response.has_value()) {
EXPECT_EQ(*response, testBundle.response.value());
} else {
EXPECT_TRUE(*status == testBundle.status.value());
EXPECT_EQ(*status, testBundle.status.value());
}
});
}
@@ -295,7 +295,7 @@ TEST_F(RPCEngineTest, ThrowDatabaseError)
auto const res = engine->buildResponse(ctx);
auto const status = std::get_if<rpc::Status>(&res.response);
ASSERT_TRUE(status != nullptr);
EXPECT_TRUE(*status == Status{RippledError::rpcTOO_BUSY});
EXPECT_EQ(*status, Status{RippledError::rpcTOO_BUSY});
});
}
@@ -327,7 +327,7 @@ TEST_F(RPCEngineTest, ThrowException)
auto const res = engine->buildResponse(ctx);
auto const status = std::get_if<rpc::Status>(&res.response);
ASSERT_TRUE(status != nullptr);
EXPECT_TRUE(*status == Status{RippledError::rpcINTERNAL});
EXPECT_EQ(*status, Status{RippledError::rpcINTERNAL});
});
}
@@ -453,7 +453,7 @@ TEST_P(RPCEngineCacheParameterTest, Test)
auto const res = engine->buildResponse(ctx);
auto const response = std::get_if<boost::json::object>(&res.response);
EXPECT_TRUE(*response == boost::json::parse(R"JSON({ "computed": "world_50"})JSON").as_object());
EXPECT_EQ(*response, boost::json::parse(R"JSON({ "computed": "world_50"})JSON").as_object());
});
}
}
@@ -498,7 +498,8 @@ TEST_F(RPCEngineTest, NotCacheIfErrorHappen)
auto const res = engine->buildResponse(ctx);
auto const error = std::get_if<rpc::Status>(&res.response);
EXPECT_TRUE(*error == rpc::Status{"Very custom error"});
ASSERT_NE(error, nullptr);
EXPECT_EQ(*error, rpc::Status{"Very custom error"});
});
}
}

View File

@@ -0,0 +1,252 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "util/AsioContextTestFixture.hpp"
#include "util/BlockingCache.hpp"
#include "util/NameGenerator.hpp"
#include <boost/asio/error.hpp>
#include <boost/asio/post.hpp>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <memory>
using testing::MockFunction;
using testing::Return;
using testing::StrictMock;
#include <boost/asio/spawn.hpp>
#include <boost/asio/steady_timer.hpp>
#include <chrono>
#include <expected>
#include <string>
struct BlockingCacheTest : SyncAsioContextTest {
using ErrorType = std::string;
using ValueType = int;
using Cache = util::BlockingCache<ValueType, ErrorType>;
using MockUpdater = StrictMock<MockFunction<std::expected<ValueType, ErrorType>(boost::asio::yield_context)>>;
using MockVerifier = StrictMock<MockFunction<bool(ValueType const&)>>;
std::unique_ptr<Cache> cache = std::make_unique<Cache>();
MockUpdater mockUpdater;
MockVerifier mockVerifier;
int const value = 42;
std::string error = "some error";
};
TEST_F(BlockingCacheTest, asyncGet_NoValueCacheUpdateSuccess)
{
EXPECT_CALL(mockUpdater, Call).WillOnce(Return(value));
EXPECT_CALL(mockVerifier, Call(value)).WillOnce(Return(true));
runSpawn([&](boost::asio::yield_context yield) {
auto result = cache->asyncGet(yield, mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
ASSERT_TRUE(result.has_value());
EXPECT_EQ(result.value(), 42);
});
}
TEST_F(BlockingCacheTest, asyncGet_NoValueCacheUpdateFailure)
{
EXPECT_CALL(mockUpdater, Call).WillOnce(Return(std::unexpected{error}));
runSpawn([&](boost::asio::yield_context yield) {
auto result = cache->asyncGet(yield, mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
ASSERT_FALSE(result.has_value());
EXPECT_EQ(result.error(), error);
});
}
TEST_F(BlockingCacheTest, asyncGet_NoValueCacheUpdateSuccessButVerifierRejects)
{
runSpawn([&](boost::asio::yield_context yield) {
std::expected<ValueType, ErrorType> result;
{
EXPECT_CALL(mockUpdater, Call).WillOnce(Return(value));
EXPECT_CALL(mockVerifier, Call(value)).WillOnce(Return(false));
result = cache->asyncGet(yield, mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
ASSERT_TRUE(result.has_value());
EXPECT_EQ(result.value(), value);
}
int const newValue = 24;
{
EXPECT_CALL(mockUpdater, Call).WillOnce(Return(newValue));
EXPECT_CALL(mockVerifier, Call(newValue)).WillOnce(Return(true));
result = cache->asyncGet(yield, mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
ASSERT_TRUE(result.has_value());
EXPECT_EQ(result.value(), newValue);
}
result = cache->asyncGet(yield, mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
ASSERT_TRUE(result.has_value());
EXPECT_EQ(result.value(), newValue);
});
}
TEST_F(BlockingCacheTest, asyncGet_HasValueCacheReturnsValue)
{
cache = std::make_unique<Cache>(value);
runSpawn([&](boost::asio::yield_context yield) {
auto result = cache->asyncGet(yield, mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
ASSERT_TRUE(result.has_value());
EXPECT_EQ(result.value(), value);
});
}
struct BlockingCacheWaitTestBundle {
bool updateSuccessful;
bool verifierAccepts;
std::string testName;
};
struct BlockingCacheWaitTest : BlockingCacheTest, testing::WithParamInterface<BlockingCacheWaitTestBundle> {};
TEST_P(BlockingCacheWaitTest, WaitForUpdate)
{
bool waitingCoroutineFinished = false;
auto waitingCoroutine = [&](boost::asio::yield_context yield) {
auto result = cache->asyncGet(yield, mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
if (GetParam().updateSuccessful) {
ASSERT_TRUE(result.has_value());
EXPECT_EQ(result.value(), value);
} else {
ASSERT_FALSE(result.has_value());
EXPECT_EQ(result.error(), error);
}
waitingCoroutineFinished = true;
};
EXPECT_CALL(mockUpdater, Call)
.WillOnce([this, &waitingCoroutine](boost::asio::yield_context yield) -> std::expected<ValueType, ErrorType> {
boost::asio::spawn(yield, waitingCoroutine);
if (GetParam().updateSuccessful) {
return value;
}
return std::unexpected{error};
});
if (GetParam().updateSuccessful)
EXPECT_CALL(mockVerifier, Call(value)).WillOnce(Return(GetParam().verifierAccepts));
runSpawnWithTimeout(std::chrono::seconds{1}, [&](boost::asio::yield_context yield) {
auto result = cache->asyncGet(yield, mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
if (GetParam().updateSuccessful) {
ASSERT_TRUE(result.has_value());
EXPECT_EQ(result.value(), value);
} else {
ASSERT_FALSE(result.has_value());
EXPECT_EQ(result.error(), error);
}
ASSERT_FALSE(waitingCoroutineFinished);
});
}
INSTANTIATE_TEST_SUITE_P(
BlockingCacheTest,
BlockingCacheWaitTest,
testing::Values(
BlockingCacheWaitTestBundle{
.updateSuccessful = true,
.verifierAccepts = true,
.testName = "UpdateSucceedsVerifierAccepts"
},
BlockingCacheWaitTestBundle{
.updateSuccessful = true,
.verifierAccepts = false,
.testName = "UpdateSucceedsVerifierRejects"
},
BlockingCacheWaitTestBundle{.updateSuccessful = false, .verifierAccepts = false, .testName = "UpdateFails"}
),
tests::util::kNAME_GENERATOR
);
TEST_F(BlockingCacheTest, InvalidateWhenStateIsNoValue)
{
ASSERT_EQ(cache->state(), Cache::State::NoValue);
cache->invalidate();
ASSERT_EQ(cache->state(), Cache::State::NoValue);
}
TEST_F(BlockingCacheTest, InvalidateWhenStateIsUpdating)
{
EXPECT_CALL(mockUpdater, Call).WillOnce([this](auto&&) {
EXPECT_EQ(cache->state(), Cache::State::Updating);
cache->invalidate();
EXPECT_EQ(cache->state(), Cache::State::Updating);
return value;
});
EXPECT_CALL(mockVerifier, Call(value)).WillOnce(Return(true));
runSpawn([&](boost::asio::yield_context yield) {
auto result = cache->asyncGet(yield, mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
ASSERT_TRUE(result.has_value());
ASSERT_EQ(result.value(), value);
ASSERT_EQ(cache->state(), Cache::State::HasValue);
});
}
TEST_F(BlockingCacheTest, InvalidateWhenStateIsHasValue)
{
cache = std::make_unique<Cache>(value);
ASSERT_EQ(cache->state(), Cache::State::HasValue);
cache->invalidate();
EXPECT_EQ(cache->state(), Cache::State::NoValue);
}
TEST_F(BlockingCacheTest, UpdateFromTwoCoroutinesHappensOnlyOnes)
{
auto waitingCoroutine = [&](boost::asio::yield_context yield) {
auto result = cache->update(yield, mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
ASSERT_TRUE(result.has_value());
ASSERT_EQ(result.value(), value);
};
EXPECT_CALL(mockUpdater, Call)
.WillOnce([this, &waitingCoroutine](boost::asio::yield_context yield) -> std::expected<ValueType, ErrorType> {
boost::asio::spawn(yield, waitingCoroutine);
return value;
});
EXPECT_CALL(mockVerifier, Call(value)).WillOnce(Return(true));
auto updatingCoroutine = [&](boost::asio::yield_context yield) {
auto const result = cache->update(yield, mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
EXPECT_TRUE(result.has_value());
ASSERT_EQ(result.value(), value);
};
runSpawnWithTimeout(std::chrono::seconds{1}, [&](boost::asio::yield_context yield) {
boost::asio::spawn(yield, updatingCoroutine);
});
}

View File

@@ -17,53 +17,292 @@
*/
//==============================================================================
#include "rpc/Errors.hpp"
#include "util/AsioContextTestFixture.hpp"
#include "util/MockAssert.hpp"
#include "util/ResponseExpirationCache.hpp"
#include <boost/asio/spawn.hpp>
#include <boost/json/object.hpp>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <chrono>
#include <string>
#include <thread>
#include <unordered_set>
using namespace util;
using testing::MockFunction;
using testing::Return;
using testing::StrictMock;
struct ResponseExpirationCacheTests : public ::testing::Test {
protected:
ResponseExpirationCache cache_{std::chrono::seconds{100}, {"key"}};
boost::json::object object_{{"key", "value"}};
struct ResponseExpirationCacheTest : SyncAsioContextTest {
using MockUpdater = StrictMock<MockFunction<
std::expected<ResponseExpirationCache::EntryData, ResponseExpirationCache::Error>(boost::asio::yield_context)>>;
using MockVerifier = StrictMock<MockFunction<bool(ResponseExpirationCache::EntryData const&)>>;
std::string const cmd = "server_info";
boost::json::object const obj = {{"some key", "some value"}};
MockUpdater mockUpdater;
MockVerifier mockVerifier;
};
TEST_F(ResponseExpirationCacheTests, PutAndGetNotExpired)
TEST_F(ResponseExpirationCacheTest, ShouldCacheDeterminesIfCommandIsCacheable)
{
EXPECT_FALSE(cache_.get("key").has_value());
std::unordered_set<std::string> cmds = {cmd, "account_info"};
ResponseExpirationCache cache{std::chrono::seconds(10), cmds};
cache_.put("key", object_);
auto result = cache_.get("key");
ASSERT_TRUE(result.has_value());
EXPECT_EQ(*result, object_);
result = cache_.get("key2");
ASSERT_FALSE(result.has_value());
for (auto const& c : cmds) {
EXPECT_TRUE(cache.shouldCache(c));
}
cache_.put("key2", object_);
result = cache_.get("key2");
ASSERT_FALSE(result.has_value());
EXPECT_FALSE(cache.shouldCache("account_tx"));
EXPECT_FALSE(cache.shouldCache("ledger"));
EXPECT_FALSE(cache.shouldCache("submit"));
EXPECT_FALSE(cache.shouldCache(""));
}
TEST_F(ResponseExpirationCacheTests, Invalidate)
TEST_F(ResponseExpirationCacheTest, ShouldCacheEmptySetMeansNothingCacheable)
{
cache_.put("key", object_);
cache_.invalidate();
EXPECT_FALSE(cache_.get("key").has_value());
std::unordered_set<std::string> const emptyCmds;
ResponseExpirationCache cache{std::chrono::seconds(10), emptyCmds};
EXPECT_FALSE(cache.shouldCache("server_info"));
EXPECT_FALSE(cache.shouldCache("account_info"));
EXPECT_FALSE(cache.shouldCache("any_command"));
EXPECT_FALSE(cache.shouldCache(""));
}
TEST_F(ResponseExpirationCacheTests, GetExpired)
TEST_F(ResponseExpirationCacheTest, ShouldCacheCaseMatchingIsRequired)
{
ResponseExpirationCache cache{std::chrono::milliseconds{1}, {"key"}};
auto const response = boost::json::object{{"key", "value"}};
std::unordered_set<std::string> const specificCmds = {cmd};
ResponseExpirationCache cache{std::chrono::seconds(10), specificCmds};
cache.put("key", response);
std::this_thread::sleep_for(std::chrono::milliseconds{2});
auto const result = cache.get("key");
EXPECT_FALSE(result);
EXPECT_TRUE(cache.shouldCache(cmd));
EXPECT_FALSE(cache.shouldCache("SERVER_INFO"));
EXPECT_FALSE(cache.shouldCache("Server_Info"));
}
TEST_F(ResponseExpirationCacheTest, GetOrUpdateNoValueInCacheCallsUpdaterAndVerifier)
{
ResponseExpirationCache cache{std::chrono::seconds(10), {cmd}};
runSpawn([&](boost::asio::yield_context yield) {
EXPECT_CALL(mockUpdater, Call)
.WillOnce(Return(ResponseExpirationCache::EntryData{
.lastUpdated = std::chrono::steady_clock::now(),
.response = obj,
}));
EXPECT_CALL(mockVerifier, Call).WillOnce(Return(true));
auto result =
cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
ASSERT_TRUE(result.has_value());
EXPECT_EQ(result.value(), obj);
});
}
TEST_F(ResponseExpirationCacheTest, GetOrUpdateExpiredValueInCacheCallsUpdaterAndVerifier)
{
ResponseExpirationCache cache{std::chrono::milliseconds(1), {cmd}};
runSpawn([&](boost::asio::yield_context yield) {
boost::json::object const expiredObject = {{"some key", "expired value"}};
EXPECT_CALL(mockUpdater, Call)
.WillOnce(Return(ResponseExpirationCache::EntryData{
.lastUpdated = std::chrono::steady_clock::now(),
.response = expiredObject,
}));
EXPECT_CALL(mockVerifier, Call).WillOnce(Return(true));
auto result =
cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
ASSERT_TRUE(result.has_value());
EXPECT_EQ(result.value(), expiredObject);
std::this_thread::sleep_for(std::chrono::milliseconds(2));
EXPECT_CALL(mockUpdater, Call)
.WillOnce(Return(
ResponseExpirationCache::EntryData{.lastUpdated = std::chrono::steady_clock::now(), .response = obj}
));
EXPECT_CALL(mockVerifier, Call).WillOnce(Return(true));
result = cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
ASSERT_TRUE(result.has_value());
EXPECT_EQ(result.value(), obj);
});
}
TEST_F(ResponseExpirationCacheTest, GetOrUpdateCachedValueNotExpiredDoesNotCallUpdaterOrVerifier)
{
ResponseExpirationCache cache{std::chrono::seconds(10), {cmd}};
runSpawn([&](boost::asio::yield_context yield) {
// First call to populate cache
EXPECT_CALL(mockUpdater, Call)
.WillOnce(Return(ResponseExpirationCache::EntryData{
.lastUpdated = std::chrono::steady_clock::now(),
.response = obj,
}));
EXPECT_CALL(mockVerifier, Call).WillOnce(Return(true));
auto result =
cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
ASSERT_TRUE(result.has_value());
EXPECT_EQ(result.value(), obj);
// Second call should use cached value and not call updater/verifier
result = cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
ASSERT_TRUE(result.has_value());
EXPECT_EQ(result.value(), obj);
});
}
TEST_F(ResponseExpirationCacheTest, GetOrUpdateHandlesErrorFromUpdater)
{
ResponseExpirationCache cache{std::chrono::seconds(10), {cmd}};
ResponseExpirationCache::Error const error{
.status = rpc::Status{rpc::ClioError::EtlConnectionError}, .warnings = {}
};
runSpawn([&](boost::asio::yield_context yield) {
EXPECT_CALL(mockUpdater, Call).WillOnce(Return(std::unexpected(error)));
auto result =
cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
ASSERT_FALSE(result.has_value());
EXPECT_EQ(result.error(), error);
});
}
TEST_F(ResponseExpirationCacheTest, GetOrUpdateVerifierRejection)
{
ResponseExpirationCache cache{std::chrono::seconds(10), {cmd}};
runSpawn([&](boost::asio::yield_context yield) {
EXPECT_CALL(mockUpdater, Call)
.WillOnce(Return(ResponseExpirationCache::EntryData{
.lastUpdated = std::chrono::steady_clock::now(),
.response = obj,
}));
EXPECT_CALL(mockVerifier, Call).WillOnce(Return(false));
auto result =
cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
ASSERT_TRUE(result.has_value());
EXPECT_EQ(result.value(), obj);
boost::json::object const anotherObj = {{"some key", "another value"}};
EXPECT_CALL(mockUpdater, Call)
.WillOnce(Return(ResponseExpirationCache::EntryData{
.lastUpdated = std::chrono::steady_clock::now(),
.response = anotherObj,
}));
EXPECT_CALL(mockVerifier, Call).WillOnce(Return(true));
result = cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
ASSERT_TRUE(result.has_value());
EXPECT_EQ(result.value(), anotherObj);
});
}
TEST_F(ResponseExpirationCacheTest, GetOrUpdateMultipleConcurrentUpdates)
{
ResponseExpirationCache cache{std::chrono::seconds(10), {cmd}};
bool waitingCoroutineFinished = false;
auto waitingCoroutine = [&](boost::asio::yield_context yield) {
auto result =
cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
ASSERT_TRUE(result.has_value());
EXPECT_EQ(result.value(), obj);
waitingCoroutineFinished = true;
};
EXPECT_CALL(mockUpdater, Call)
.WillOnce(
[this, &waitingCoroutine](boost::asio::yield_context yield
) -> std::expected<ResponseExpirationCache::EntryData, ResponseExpirationCache::Error> {
boost::asio::spawn(yield, waitingCoroutine);
return ResponseExpirationCache::EntryData{
.lastUpdated = std::chrono::steady_clock::now(),
.response = obj,
};
}
);
EXPECT_CALL(mockVerifier, Call).WillOnce(Return(true));
runSpawnWithTimeout(std::chrono::seconds{1}, [&](boost::asio::yield_context yield) {
auto result =
cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
ASSERT_TRUE(result.has_value());
EXPECT_EQ(result.value(), obj);
ASSERT_FALSE(waitingCoroutineFinished);
});
}
TEST_F(ResponseExpirationCacheTest, InvalidateForcesRefresh)
{
ResponseExpirationCache cache{std::chrono::seconds(10), {cmd}};
runSpawn([&](boost::asio::yield_context yield) {
boost::json::object oldObject = {{"some key", "old value"}};
EXPECT_CALL(mockUpdater, Call)
.WillOnce(Return(ResponseExpirationCache::EntryData{
.lastUpdated = std::chrono::steady_clock::now(),
.response = oldObject,
}));
EXPECT_CALL(mockVerifier, Call).WillOnce(Return(true));
auto result =
cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
ASSERT_TRUE(result.has_value());
EXPECT_EQ(result.value(), oldObject);
cache.invalidate();
EXPECT_CALL(mockUpdater, Call)
.WillOnce(Return(ResponseExpirationCache::EntryData{
.lastUpdated = std::chrono::steady_clock::now(),
.response = obj,
}));
EXPECT_CALL(mockVerifier, Call).WillOnce(Return(true));
result = cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
ASSERT_TRUE(result.has_value());
EXPECT_EQ(result.value(), obj);
});
}
struct ResponseExpirationCacheAssertTest : common::util::WithMockAssert, ResponseExpirationCacheTest {};
TEST_F(ResponseExpirationCacheAssertTest, NonCacheableCommandThrowsAssertion)
{
ResponseExpirationCache cache{std::chrono::seconds(10), {cmd}};
ASSERT_FALSE(cache.shouldCache("non_cacheable_command"));
runSpawn([&](boost::asio::yield_context yield) {
EXPECT_CLIO_ASSERT_FAIL({
[[maybe_unused]]
auto const v = cache.getOrUpdate(
yield, "non_cacheable_command", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()
);
});
});
}