mirror of
https://github.com/XRPLF/clio.git
synced 2025-12-06 17:27:58 +00:00
chore: Remove using blocking cache (#2328)
BlockingCache has a bug so reverting its usage for now.
This commit is contained in:
@@ -57,7 +57,6 @@
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <utility>
|
||||
#include <variant>
|
||||
#include <vector>
|
||||
|
||||
using namespace util::config;
|
||||
@@ -278,40 +277,46 @@ LoadBalancer::forwardToRippled(
|
||||
return std::unexpected{rpc::ClioError::RpcCommandIsMissing};
|
||||
|
||||
auto const cmd = boost::json::value_to<std::string>(request.at("command"));
|
||||
|
||||
if (forwardingCache_ and forwardingCache_->shouldCache(cmd)) {
|
||||
bool servedFromCache = true;
|
||||
auto updater = [this, &request, &clientIp, &servedFromCache, isAdmin](boost::asio::yield_context yield)
|
||||
-> std::expected<util::ResponseExpirationCache::EntryData, util::ResponseExpirationCache::Error> {
|
||||
servedFromCache = false;
|
||||
auto result = forwardToRippledImpl(request, clientIp, isAdmin, yield);
|
||||
if (result.has_value()) {
|
||||
return util::ResponseExpirationCache::EntryData{
|
||||
.lastUpdated = std::chrono::steady_clock::now(), .response = std::move(result).value()
|
||||
};
|
||||
}
|
||||
return std::unexpected{
|
||||
util::ResponseExpirationCache::Error{.status = rpc::Status{result.error()}, .warnings = {}}
|
||||
};
|
||||
};
|
||||
|
||||
auto result = forwardingCache_->getOrUpdate(
|
||||
yield, cmd, std::move(updater), [](util::ResponseExpirationCache::EntryData const& entry) {
|
||||
return not entry.response.contains("error");
|
||||
}
|
||||
);
|
||||
if (servedFromCache) {
|
||||
++forwardingCounters_.cacheHit.get();
|
||||
if (forwardingCache_) {
|
||||
if (auto cachedResponse = forwardingCache_->get(cmd); cachedResponse) {
|
||||
forwardingCounters_.cacheHit.get() += 1;
|
||||
return std::move(cachedResponse).value();
|
||||
}
|
||||
if (result.has_value()) {
|
||||
return std::move(result).value();
|
||||
}
|
||||
forwardingCounters_.cacheMiss.get() += 1;
|
||||
|
||||
ASSERT(not sources_.empty(), "ETL sources must be configured to forward requests.");
|
||||
std::size_t sourceIdx = randomGenerator_->uniform(0ul, sources_.size() - 1);
|
||||
|
||||
auto numAttempts = 0u;
|
||||
|
||||
auto xUserValue = isAdmin ? kADMIN_FORWARDING_X_USER_VALUE : kUSER_FORWARDING_X_USER_VALUE;
|
||||
|
||||
std::optional<boost::json::object> response;
|
||||
rpc::ClioError error = rpc::ClioError::EtlConnectionError;
|
||||
while (numAttempts < sources_.size()) {
|
||||
auto [res, duration] =
|
||||
util::timed([&]() { return sources_[sourceIdx]->forwardToRippled(request, clientIp, xUserValue, yield); });
|
||||
if (res) {
|
||||
forwardingCounters_.successDuration.get() += duration;
|
||||
response = std::move(res).value();
|
||||
break;
|
||||
}
|
||||
auto const combinedError = result.error().status.code;
|
||||
ASSERT(std::holds_alternative<rpc::ClioError>(combinedError), "There could be only ClioError here");
|
||||
return std::unexpected{std::get<rpc::ClioError>(combinedError)};
|
||||
forwardingCounters_.failDuration.get() += duration;
|
||||
++forwardingCounters_.retries.get();
|
||||
error = std::max(error, res.error()); // Choose the best result between all sources
|
||||
|
||||
sourceIdx = (sourceIdx + 1) % sources_.size();
|
||||
++numAttempts;
|
||||
}
|
||||
|
||||
return forwardToRippledImpl(request, clientIp, isAdmin, yield);
|
||||
if (response) {
|
||||
if (forwardingCache_ and not response->contains("error"))
|
||||
forwardingCache_->put(cmd, *response);
|
||||
return std::move(response).value();
|
||||
}
|
||||
|
||||
return std::unexpected{error};
|
||||
}
|
||||
|
||||
boost::json::value
|
||||
@@ -402,47 +407,4 @@ LoadBalancer::chooseForwardingSource()
|
||||
}
|
||||
}
|
||||
|
||||
std::expected<boost::json::object, rpc::CombinedError>
|
||||
LoadBalancer::forwardToRippledImpl(
|
||||
boost::json::object const& request,
|
||||
std::optional<std::string> const& clientIp,
|
||||
bool const isAdmin,
|
||||
boost::asio::yield_context yield
|
||||
)
|
||||
{
|
||||
++forwardingCounters_.cacheMiss.get();
|
||||
|
||||
ASSERT(not sources_.empty(), "ETL sources must be configured to forward requests.");
|
||||
std::size_t sourceIdx = randomGenerator_->uniform(0ul, sources_.size() - 1);
|
||||
|
||||
auto numAttempts = 0u;
|
||||
|
||||
auto xUserValue = isAdmin ? kADMIN_FORWARDING_X_USER_VALUE : kUSER_FORWARDING_X_USER_VALUE;
|
||||
|
||||
std::optional<boost::json::object> response;
|
||||
rpc::ClioError error = rpc::ClioError::EtlConnectionError;
|
||||
while (numAttempts < sources_.size()) {
|
||||
auto [res, duration] =
|
||||
util::timed([&]() { return sources_[sourceIdx]->forwardToRippled(request, clientIp, xUserValue, yield); });
|
||||
|
||||
if (res) {
|
||||
forwardingCounters_.successDuration.get() += duration;
|
||||
response = std::move(res).value();
|
||||
break;
|
||||
}
|
||||
forwardingCounters_.failDuration.get() += duration;
|
||||
++forwardingCounters_.retries.get();
|
||||
error = std::max(error, res.error()); // Choose the best result between all sources
|
||||
|
||||
sourceIdx = (sourceIdx + 1) % sources_.size();
|
||||
++numAttempts;
|
||||
}
|
||||
|
||||
if (response.has_value()) {
|
||||
return std::move(response).value();
|
||||
}
|
||||
|
||||
return std::unexpected{error};
|
||||
}
|
||||
|
||||
} // namespace etl
|
||||
|
||||
@@ -49,6 +49,7 @@
|
||||
#include <concepts>
|
||||
#include <cstdint>
|
||||
#include <expected>
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
#include <string>
|
||||
@@ -281,14 +282,6 @@ private:
|
||||
*/
|
||||
void
|
||||
chooseForwardingSource();
|
||||
|
||||
std::expected<boost::json::object, rpc::CombinedError>
|
||||
forwardToRippledImpl(
|
||||
boost::json::object const& request,
|
||||
std::optional<std::string> const& clientIp,
|
||||
bool isAdmin,
|
||||
boost::asio::yield_context yield
|
||||
);
|
||||
};
|
||||
|
||||
} // namespace etl
|
||||
|
||||
@@ -58,7 +58,6 @@
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <utility>
|
||||
#include <variant>
|
||||
#include <vector>
|
||||
|
||||
using namespace util::config;
|
||||
@@ -284,40 +283,46 @@ LoadBalancer::forwardToRippled(
|
||||
return std::unexpected{rpc::ClioError::RpcCommandIsMissing};
|
||||
|
||||
auto const cmd = boost::json::value_to<std::string>(request.at("command"));
|
||||
|
||||
if (forwardingCache_ and forwardingCache_->shouldCache(cmd)) {
|
||||
bool servedFromCache = true;
|
||||
auto updater = [this, &request, &clientIp, &servedFromCache, isAdmin](boost::asio::yield_context yield)
|
||||
-> std::expected<util::ResponseExpirationCache::EntryData, util::ResponseExpirationCache::Error> {
|
||||
servedFromCache = false;
|
||||
auto result = forwardToRippledImpl(request, clientIp, isAdmin, yield);
|
||||
if (result.has_value()) {
|
||||
return util::ResponseExpirationCache::EntryData{
|
||||
.lastUpdated = std::chrono::steady_clock::now(), .response = std::move(result).value()
|
||||
};
|
||||
}
|
||||
return std::unexpected{
|
||||
util::ResponseExpirationCache::Error{.status = rpc::Status{result.error()}, .warnings = {}}
|
||||
};
|
||||
};
|
||||
|
||||
auto result = forwardingCache_->getOrUpdate(
|
||||
yield, cmd, std::move(updater), [](util::ResponseExpirationCache::EntryData const& entry) {
|
||||
return not entry.response.contains("error");
|
||||
}
|
||||
);
|
||||
if (servedFromCache) {
|
||||
++forwardingCounters_.cacheHit.get();
|
||||
if (forwardingCache_) {
|
||||
if (auto cachedResponse = forwardingCache_->get(cmd); cachedResponse) {
|
||||
forwardingCounters_.cacheHit.get() += 1;
|
||||
return std::move(cachedResponse).value();
|
||||
}
|
||||
if (result.has_value()) {
|
||||
return std::move(result).value();
|
||||
}
|
||||
forwardingCounters_.cacheMiss.get() += 1;
|
||||
|
||||
ASSERT(not sources_.empty(), "ETL sources must be configured to forward requests.");
|
||||
std::size_t sourceIdx = randomGenerator_->uniform(0ul, sources_.size() - 1);
|
||||
|
||||
auto numAttempts = 0u;
|
||||
|
||||
auto xUserValue = isAdmin ? kADMIN_FORWARDING_X_USER_VALUE : kUSER_FORWARDING_X_USER_VALUE;
|
||||
|
||||
std::optional<boost::json::object> response;
|
||||
rpc::ClioError error = rpc::ClioError::EtlConnectionError;
|
||||
while (numAttempts < sources_.size()) {
|
||||
auto [res, duration] =
|
||||
util::timed([&]() { return sources_[sourceIdx]->forwardToRippled(request, clientIp, xUserValue, yield); });
|
||||
if (res) {
|
||||
forwardingCounters_.successDuration.get() += duration;
|
||||
response = std::move(res).value();
|
||||
break;
|
||||
}
|
||||
auto const combinedError = result.error().status.code;
|
||||
ASSERT(std::holds_alternative<rpc::ClioError>(combinedError), "There could be only ClioError here");
|
||||
return std::unexpected{std::get<rpc::ClioError>(combinedError)};
|
||||
forwardingCounters_.failDuration.get() += duration;
|
||||
++forwardingCounters_.retries.get();
|
||||
error = std::max(error, res.error()); // Choose the best result between all sources
|
||||
|
||||
sourceIdx = (sourceIdx + 1) % sources_.size();
|
||||
++numAttempts;
|
||||
}
|
||||
|
||||
return forwardToRippledImpl(request, clientIp, isAdmin, yield);
|
||||
if (response) {
|
||||
if (forwardingCache_ and not response->contains("error"))
|
||||
forwardingCache_->put(cmd, *response);
|
||||
return std::move(response).value();
|
||||
}
|
||||
|
||||
return std::unexpected{error};
|
||||
}
|
||||
|
||||
boost::json::value
|
||||
@@ -408,47 +413,4 @@ LoadBalancer::chooseForwardingSource()
|
||||
}
|
||||
}
|
||||
|
||||
std::expected<boost::json::object, rpc::CombinedError>
|
||||
LoadBalancer::forwardToRippledImpl(
|
||||
boost::json::object const& request,
|
||||
std::optional<std::string> const& clientIp,
|
||||
bool isAdmin,
|
||||
boost::asio::yield_context yield
|
||||
)
|
||||
{
|
||||
++forwardingCounters_.cacheMiss.get();
|
||||
|
||||
ASSERT(not sources_.empty(), "ETL sources must be configured to forward requests.");
|
||||
std::size_t sourceIdx = randomGenerator_->uniform(0ul, sources_.size() - 1);
|
||||
|
||||
auto numAttempts = 0u;
|
||||
|
||||
auto xUserValue = isAdmin ? kADMIN_FORWARDING_X_USER_VALUE : kUSER_FORWARDING_X_USER_VALUE;
|
||||
|
||||
std::optional<boost::json::object> response;
|
||||
rpc::ClioError error = rpc::ClioError::EtlConnectionError;
|
||||
while (numAttempts < sources_.size()) {
|
||||
auto [res, duration] =
|
||||
util::timed([&]() { return sources_[sourceIdx]->forwardToRippled(request, clientIp, xUserValue, yield); });
|
||||
|
||||
if (res) {
|
||||
forwardingCounters_.successDuration.get() += duration;
|
||||
response = std::move(res).value();
|
||||
break;
|
||||
}
|
||||
forwardingCounters_.failDuration.get() += duration;
|
||||
++forwardingCounters_.retries.get();
|
||||
error = std::max(error, res.error()); // Choose the best result between all sources
|
||||
|
||||
sourceIdx = (sourceIdx + 1) % sources_.size();
|
||||
++numAttempts;
|
||||
}
|
||||
|
||||
if (response.has_value()) {
|
||||
return std::move(response).value();
|
||||
}
|
||||
|
||||
return std::unexpected{error};
|
||||
}
|
||||
|
||||
} // namespace etlng
|
||||
|
||||
@@ -282,14 +282,6 @@ private:
|
||||
*/
|
||||
void
|
||||
chooseForwardingSource();
|
||||
|
||||
std::expected<boost::json::object, rpc::CombinedError>
|
||||
forwardToRippledImpl(
|
||||
boost::json::object const& request,
|
||||
std::optional<std::string> const& clientIp,
|
||||
bool isAdmin,
|
||||
boost::asio::yield_context yield
|
||||
);
|
||||
};
|
||||
|
||||
} // namespace etlng
|
||||
|
||||
@@ -157,48 +157,55 @@ public:
|
||||
return forwardingProxy_.forward(ctx);
|
||||
}
|
||||
|
||||
if (not ctx.isAdmin and responseCache_ and responseCache_->shouldCache(ctx.method)) {
|
||||
auto updater = [this, &ctx](boost::asio::yield_context)
|
||||
-> std::expected<util::ResponseExpirationCache::EntryData, util::ResponseExpirationCache::Error> {
|
||||
auto result = buildResponseImpl(ctx);
|
||||
|
||||
auto const extracted =
|
||||
[&result]() -> std::expected<boost::json::object, util::ResponseExpirationCache::Error> {
|
||||
if (result.response.has_value()) {
|
||||
return std::move(result.response).value();
|
||||
}
|
||||
return std::unexpected{util::ResponseExpirationCache::Error{
|
||||
.status = std::move(result.response).error(), .warnings = std::move(result.warnings)
|
||||
}};
|
||||
}();
|
||||
|
||||
if (extracted.has_value()) {
|
||||
return util::ResponseExpirationCache::EntryData{
|
||||
.lastUpdated = std::chrono::steady_clock::now(), .response = std::move(extracted).value()
|
||||
};
|
||||
}
|
||||
return std::unexpected{std::move(extracted).error()};
|
||||
};
|
||||
|
||||
auto result = responseCache_->getOrUpdate(
|
||||
ctx.yield,
|
||||
ctx.method,
|
||||
std::move(updater),
|
||||
[&ctx](util::ResponseExpirationCache::EntryData const& entry) {
|
||||
return not ctx.isAdmin and not entry.response.contains("error");
|
||||
}
|
||||
);
|
||||
if (result.has_value()) {
|
||||
return Result{std::move(result).value()};
|
||||
}
|
||||
|
||||
auto error = std::move(result).error();
|
||||
Result errorResult{std::move(error.status)};
|
||||
errorResult.warnings = std::move(error.warnings);
|
||||
return errorResult;
|
||||
if (not ctx.isAdmin and responseCache_) {
|
||||
if (auto res = responseCache_->get(ctx.method); res.has_value())
|
||||
return Result{std::move(res).value()};
|
||||
}
|
||||
|
||||
return buildResponseImpl(ctx);
|
||||
if (backend_->isTooBusy()) {
|
||||
LOG(log_.error()) << "Database is too busy. Rejecting request";
|
||||
notifyTooBusy(); // TODO: should we add ctx.method if we have it?
|
||||
return Result{Status{RippledError::rpcTOO_BUSY}};
|
||||
}
|
||||
|
||||
auto const method = handlerProvider_->getHandler(ctx.method);
|
||||
if (!method) {
|
||||
notifyUnknownCommand();
|
||||
return Result{Status{RippledError::rpcUNKNOWN_COMMAND}};
|
||||
}
|
||||
|
||||
try {
|
||||
LOG(perfLog_.debug()) << ctx.tag() << " start executing rpc `" << ctx.method << '`';
|
||||
|
||||
auto const context = Context{
|
||||
.yield = ctx.yield,
|
||||
.session = ctx.session,
|
||||
.isAdmin = ctx.isAdmin,
|
||||
.clientIp = ctx.clientIp,
|
||||
.apiVersion = ctx.apiVersion
|
||||
};
|
||||
auto v = (*method).process(ctx.params, context);
|
||||
|
||||
LOG(perfLog_.debug()) << ctx.tag() << " finish executing rpc `" << ctx.method << '`';
|
||||
|
||||
if (not v) {
|
||||
notifyErrored(ctx.method);
|
||||
} else if (not ctx.isAdmin and responseCache_) {
|
||||
responseCache_->put(ctx.method, v.result->as_object());
|
||||
}
|
||||
|
||||
return Result{std::move(v)};
|
||||
} catch (data::DatabaseTimeout const& t) {
|
||||
LOG(log_.error()) << "Database timeout";
|
||||
notifyTooBusy();
|
||||
|
||||
return Result{Status{RippledError::rpcTOO_BUSY}};
|
||||
} catch (std::exception const& ex) {
|
||||
LOG(log_.error()) << ctx.tag() << "Caught exception: " << ex.what();
|
||||
notifyInternalError();
|
||||
|
||||
return Result{Status{RippledError::rpcINTERNAL}};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -16,31 +16,44 @@
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include "util/ResponseExpirationCache.hpp"
|
||||
|
||||
#include "util/Assert.hpp"
|
||||
|
||||
#include <boost/asio/spawn.hpp>
|
||||
#include <boost/json/object.hpp>
|
||||
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <optional>
|
||||
#include <shared_mutex>
|
||||
#include <string>
|
||||
#include <unordered_set>
|
||||
#include <utility>
|
||||
|
||||
namespace util {
|
||||
|
||||
ResponseExpirationCache::ResponseExpirationCache(
|
||||
std::chrono::steady_clock::duration cacheTimeout,
|
||||
std::unordered_set<std::string> const& cmds
|
||||
)
|
||||
: cacheTimeout_(cacheTimeout)
|
||||
void
|
||||
ResponseExpirationCache::Entry::put(boost::json::object response)
|
||||
{
|
||||
for (auto const& command : cmds) {
|
||||
cache_.emplace(command, std::make_unique<CacheEntry>());
|
||||
}
|
||||
response_ = std::move(response);
|
||||
lastUpdated_ = std::chrono::steady_clock::now();
|
||||
}
|
||||
|
||||
std::optional<boost::json::object>
|
||||
ResponseExpirationCache::Entry::get() const
|
||||
{
|
||||
return response_;
|
||||
}
|
||||
|
||||
std::chrono::steady_clock::time_point
|
||||
ResponseExpirationCache::Entry::lastUpdated() const
|
||||
{
|
||||
return lastUpdated_;
|
||||
}
|
||||
|
||||
void
|
||||
ResponseExpirationCache::Entry::invalidate()
|
||||
{
|
||||
response_.reset();
|
||||
}
|
||||
|
||||
bool
|
||||
@@ -49,41 +62,38 @@ ResponseExpirationCache::shouldCache(std::string const& cmd)
|
||||
return cache_.contains(cmd);
|
||||
}
|
||||
|
||||
std::expected<boost::json::object, ResponseExpirationCache::Error>
|
||||
ResponseExpirationCache::getOrUpdate(
|
||||
boost::asio::yield_context yield,
|
||||
std::string const& cmd,
|
||||
Updater updater,
|
||||
Verifier verifier
|
||||
)
|
||||
std::optional<boost::json::object>
|
||||
ResponseExpirationCache::get(std::string const& cmd) const
|
||||
{
|
||||
auto it = cache_.find(cmd);
|
||||
ASSERT(it != cache_.end(), "Can't get a value which is not in the cache");
|
||||
if (it == cache_.end())
|
||||
return std::nullopt;
|
||||
|
||||
auto& entry = it->second;
|
||||
{
|
||||
auto result = entry->asyncGet(yield, updater, verifier);
|
||||
if (not result.has_value()) {
|
||||
return std::unexpected{std::move(result).error()};
|
||||
}
|
||||
if (std::chrono::steady_clock::now() - result->lastUpdated < cacheTimeout_) {
|
||||
return std::move(result)->response;
|
||||
}
|
||||
}
|
||||
auto const& entry = it->second.lock<std::shared_lock>();
|
||||
if (std::chrono::steady_clock::now() - entry->lastUpdated() > cacheTimeout_)
|
||||
return std::nullopt;
|
||||
|
||||
// Force update due to cache timeout
|
||||
auto result = entry->update(yield, std::move(updater), std::move(verifier));
|
||||
if (not result.has_value()) {
|
||||
return std::unexpected{std::move(result).error()};
|
||||
}
|
||||
return std::move(result)->response;
|
||||
return entry->get();
|
||||
}
|
||||
|
||||
void
|
||||
ResponseExpirationCache::put(std::string const& cmd, boost::json::object const& response)
|
||||
{
|
||||
if (not shouldCache(cmd))
|
||||
return;
|
||||
|
||||
ASSERT(cache_.contains(cmd), "Command is not in the cache: {}", cmd);
|
||||
|
||||
auto entry = cache_[cmd].lock<std::unique_lock>();
|
||||
entry->put(response);
|
||||
}
|
||||
|
||||
void
|
||||
ResponseExpirationCache::invalidate()
|
||||
{
|
||||
for (auto& [_, entry] : cache_) {
|
||||
entry->invalidate();
|
||||
auto entryLock = entry.lock<std::unique_lock>();
|
||||
entryLock->invalidate();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -16,18 +16,15 @@
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "rpc/Errors.hpp"
|
||||
#include "util/BlockingCache.hpp"
|
||||
#include "util/Mutex.hpp"
|
||||
|
||||
#include <boost/asio/spawn.hpp>
|
||||
#include <boost/json/array.hpp>
|
||||
#include <boost/json/object.hpp>
|
||||
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
#include <shared_mutex>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
#include <unordered_set>
|
||||
@@ -36,89 +33,94 @@ namespace util {
|
||||
|
||||
/**
|
||||
* @brief Cache of requests' responses with TTL support and configurable cacheable commands
|
||||
*
|
||||
* This class implements a time-based expiration cache for RPC responses. It allows
|
||||
* caching responses for specified commands and automatically invalidates them after
|
||||
* a configured timeout period. The cache uses BlockingCache internally to handle
|
||||
* concurrent access and updates.
|
||||
*/
|
||||
class ResponseExpirationCache {
|
||||
public:
|
||||
/**
|
||||
* @brief A data structure to store a cache entry with its timestamp
|
||||
* @brief A class to store a cache entry.
|
||||
*/
|
||||
struct EntryData {
|
||||
std::chrono::steady_clock::time_point lastUpdated; ///< When the entry was last updated
|
||||
boost::json::object response; ///< The cached response data
|
||||
class Entry {
|
||||
std::chrono::steady_clock::time_point lastUpdated_;
|
||||
std::optional<boost::json::object> response_;
|
||||
|
||||
public:
|
||||
/**
|
||||
* @brief Put a response into the cache
|
||||
*
|
||||
* @param response The response to store
|
||||
*/
|
||||
void
|
||||
put(boost::json::object response);
|
||||
|
||||
/**
|
||||
* @brief Get the response from the cache
|
||||
*
|
||||
* @return The response
|
||||
*/
|
||||
std::optional<boost::json::object>
|
||||
get() const;
|
||||
|
||||
/**
|
||||
* @brief Get the last time the cache was updated
|
||||
*
|
||||
* @return The last time the cache was updated
|
||||
*/
|
||||
std::chrono::steady_clock::time_point
|
||||
lastUpdated() const;
|
||||
|
||||
/**
|
||||
* @brief Invalidate the cache entry
|
||||
*/
|
||||
void
|
||||
invalidate();
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief A data structure to represent errors that can occur during an update of the cache
|
||||
*/
|
||||
struct Error {
|
||||
rpc::Status status; ///< The status code and message of the error
|
||||
boost::json::array warnings; ///< Any warnings related to the request
|
||||
|
||||
bool
|
||||
operator==(Error const&) const = default;
|
||||
};
|
||||
|
||||
using CacheEntry = util::BlockingCache<EntryData, Error>;
|
||||
|
||||
private:
|
||||
std::chrono::steady_clock::duration cacheTimeout_;
|
||||
std::unordered_map<std::string, std::unique_ptr<CacheEntry>> cache_;
|
||||
std::unordered_map<std::string, util::Mutex<Entry, std::shared_mutex>> cache_;
|
||||
|
||||
bool
|
||||
shouldCache(std::string const& cmd);
|
||||
|
||||
public:
|
||||
/**
|
||||
* @brief Construct a new ResponseExpirationCache object
|
||||
* @brief Construct a new Cache object
|
||||
*
|
||||
* @param cacheTimeout The time period after which cached entries expire
|
||||
* @param cmds The commands that should be cached (requests for other commands won't be cached)
|
||||
* @param cacheTimeout The time for cache entries to expire
|
||||
* @param cmds The commands that should be cached
|
||||
*/
|
||||
ResponseExpirationCache(
|
||||
std::chrono::steady_clock::duration cacheTimeout,
|
||||
std::unordered_set<std::string> const& cmds
|
||||
);
|
||||
)
|
||||
: cacheTimeout_(cacheTimeout)
|
||||
{
|
||||
for (auto const& command : cmds) {
|
||||
cache_.emplace(command, Entry{});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Check if the given command should be cached
|
||||
* @brief Get a response from the cache
|
||||
*
|
||||
* @param cmd The command to check
|
||||
* @return true if the command should be cached, false otherwise
|
||||
*/
|
||||
bool
|
||||
shouldCache(std::string const& cmd);
|
||||
|
||||
using Updater = CacheEntry::Updater;
|
||||
using Verifier = CacheEntry::Verifier;
|
||||
|
||||
/**
|
||||
* @brief Get a cached response or update the cache if necessary
|
||||
*
|
||||
* This method returns a cached response if it exists and hasn't expired.
|
||||
* If the cache entry is expired or doesn't exist, it calls the updater to
|
||||
* generate a new value. If multiple coroutines request the same entry
|
||||
* simultaneously, only one updater will be called while others wait.
|
||||
*
|
||||
* @note cmd must be one of the commands that are cached. There is an ASSERT() inside the function
|
||||
*
|
||||
* @param yield Asio yield context for coroutine suspension
|
||||
* @param cmd The command to get the response for
|
||||
* @param updater Function to generate the response if not in cache or expired
|
||||
* @param verifier Function to validate if a response should be cached
|
||||
* @return The cached or newly generated response, or an error
|
||||
* @return The response if it exists or std::nullopt otherwise
|
||||
*/
|
||||
[[nodiscard]] std::expected<boost::json::object, Error>
|
||||
getOrUpdate(boost::asio::yield_context yield, std::string const& cmd, Updater updater, Verifier verifier);
|
||||
[[nodiscard]] std::optional<boost::json::object>
|
||||
get(std::string const& cmd) const;
|
||||
|
||||
/**
|
||||
* @brief Put a response into the cache if the request should be cached
|
||||
*
|
||||
* @param cmd The command to store the response for
|
||||
* @param response The response to store
|
||||
*/
|
||||
void
|
||||
put(std::string const& cmd, boost::json::object const& response);
|
||||
|
||||
/**
|
||||
* @brief Invalidate all entries in the cache
|
||||
*
|
||||
* This causes all cached entries to be cleared, forcing the next access
|
||||
* to generate new responses.
|
||||
*/
|
||||
void
|
||||
invalidate();
|
||||
};
|
||||
|
||||
} // namespace util
|
||||
|
||||
@@ -17,307 +17,53 @@
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include "rpc/Errors.hpp"
|
||||
#include "util/AsioContextTestFixture.hpp"
|
||||
#include "util/MockAssert.hpp"
|
||||
#include "util/ResponseExpirationCache.hpp"
|
||||
|
||||
#include <boost/asio/spawn.hpp>
|
||||
#include <boost/json/object.hpp>
|
||||
#include <gmock/gmock.h>
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <chrono>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <unordered_set>
|
||||
|
||||
using namespace util;
|
||||
using testing::MockFunction;
|
||||
using testing::Return;
|
||||
using testing::StrictMock;
|
||||
|
||||
struct ResponseExpirationCacheTest : SyncAsioContextTest {
|
||||
using MockUpdater = StrictMock<MockFunction<
|
||||
std::expected<ResponseExpirationCache::EntryData, ResponseExpirationCache::Error>(boost::asio::yield_context)>>;
|
||||
using MockVerifier = StrictMock<MockFunction<bool(ResponseExpirationCache::EntryData const&)>>;
|
||||
|
||||
std::string const cmd = "server_info";
|
||||
boost::json::object const obj = {{"some key", "some value"}};
|
||||
MockUpdater mockUpdater;
|
||||
MockVerifier mockVerifier;
|
||||
struct ResponseExpirationCacheTests : public ::testing::Test {
|
||||
protected:
|
||||
ResponseExpirationCache cache_{std::chrono::seconds{100}, {"key"}};
|
||||
boost::json::object object_{{"key", "value"}};
|
||||
};
|
||||
|
||||
TEST_F(ResponseExpirationCacheTest, ShouldCacheDeterminesIfCommandIsCacheable)
|
||||
TEST_F(ResponseExpirationCacheTests, PutAndGetNotExpired)
|
||||
{
|
||||
std::unordered_set<std::string> const cmds = {cmd, "account_info"};
|
||||
ResponseExpirationCache cache{std::chrono::seconds(10), cmds};
|
||||
EXPECT_FALSE(cache_.get("key").has_value());
|
||||
|
||||
for (auto const& c : cmds) {
|
||||
EXPECT_TRUE(cache.shouldCache(c));
|
||||
}
|
||||
cache_.put("key", object_);
|
||||
auto result = cache_.get("key");
|
||||
ASSERT_TRUE(result.has_value());
|
||||
EXPECT_EQ(*result, object_);
|
||||
result = cache_.get("key2");
|
||||
ASSERT_FALSE(result.has_value());
|
||||
|
||||
EXPECT_FALSE(cache.shouldCache("account_tx"));
|
||||
EXPECT_FALSE(cache.shouldCache("ledger"));
|
||||
EXPECT_FALSE(cache.shouldCache("submit"));
|
||||
EXPECT_FALSE(cache.shouldCache(""));
|
||||
cache_.put("key2", object_);
|
||||
result = cache_.get("key2");
|
||||
ASSERT_FALSE(result.has_value());
|
||||
}
|
||||
|
||||
TEST_F(ResponseExpirationCacheTest, ShouldCacheEmptySetMeansNothingCacheable)
|
||||
TEST_F(ResponseExpirationCacheTests, Invalidate)
|
||||
{
|
||||
std::unordered_set<std::string> const emptyCmds;
|
||||
ResponseExpirationCache cache{std::chrono::seconds(10), emptyCmds};
|
||||
|
||||
EXPECT_FALSE(cache.shouldCache("server_info"));
|
||||
EXPECT_FALSE(cache.shouldCache("account_info"));
|
||||
EXPECT_FALSE(cache.shouldCache("any_command"));
|
||||
EXPECT_FALSE(cache.shouldCache(""));
|
||||
cache_.put("key", object_);
|
||||
cache_.invalidate();
|
||||
EXPECT_FALSE(cache_.get("key").has_value());
|
||||
}
|
||||
|
||||
TEST_F(ResponseExpirationCacheTest, ShouldCacheCaseMatchingIsRequired)
|
||||
TEST_F(ResponseExpirationCacheTests, GetExpired)
|
||||
{
|
||||
std::unordered_set<std::string> const specificCmds = {cmd};
|
||||
ResponseExpirationCache cache{std::chrono::seconds(10), specificCmds};
|
||||
ResponseExpirationCache cache{std::chrono::milliseconds{1}, {"key"}};
|
||||
auto const response = boost::json::object{{"key", "value"}};
|
||||
|
||||
EXPECT_TRUE(cache.shouldCache(cmd));
|
||||
EXPECT_FALSE(cache.shouldCache("SERVER_INFO"));
|
||||
EXPECT_FALSE(cache.shouldCache("Server_Info"));
|
||||
}
|
||||
|
||||
TEST_F(ResponseExpirationCacheTest, GetOrUpdateNoValueInCacheCallsUpdaterAndVerifier)
|
||||
{
|
||||
ResponseExpirationCache cache{std::chrono::seconds(10), {cmd}};
|
||||
|
||||
runSpawn([&](boost::asio::yield_context yield) {
|
||||
EXPECT_CALL(mockUpdater, Call)
|
||||
.WillOnce(Return(
|
||||
ResponseExpirationCache::EntryData{
|
||||
.lastUpdated = std::chrono::steady_clock::now(),
|
||||
.response = obj,
|
||||
}
|
||||
));
|
||||
EXPECT_CALL(mockVerifier, Call).WillOnce(Return(true));
|
||||
|
||||
auto result =
|
||||
cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
|
||||
|
||||
ASSERT_TRUE(result.has_value());
|
||||
EXPECT_EQ(result.value(), obj);
|
||||
});
|
||||
}
|
||||
|
||||
TEST_F(ResponseExpirationCacheTest, GetOrUpdateExpiredValueInCacheCallsUpdaterAndVerifier)
|
||||
{
|
||||
ResponseExpirationCache cache{std::chrono::milliseconds(1), {cmd}};
|
||||
|
||||
runSpawn([&](boost::asio::yield_context yield) {
|
||||
boost::json::object const expiredObject = {{"some key", "expired value"}};
|
||||
EXPECT_CALL(mockUpdater, Call)
|
||||
.WillOnce(Return(
|
||||
ResponseExpirationCache::EntryData{
|
||||
.lastUpdated = std::chrono::steady_clock::now(),
|
||||
.response = expiredObject,
|
||||
}
|
||||
));
|
||||
EXPECT_CALL(mockVerifier, Call).WillOnce(Return(true));
|
||||
|
||||
auto result =
|
||||
cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
|
||||
|
||||
ASSERT_TRUE(result.has_value());
|
||||
EXPECT_EQ(result.value(), expiredObject);
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(2));
|
||||
|
||||
EXPECT_CALL(mockUpdater, Call)
|
||||
.WillOnce(Return(
|
||||
ResponseExpirationCache::EntryData{.lastUpdated = std::chrono::steady_clock::now(), .response = obj}
|
||||
));
|
||||
EXPECT_CALL(mockVerifier, Call).WillOnce(Return(true));
|
||||
|
||||
result = cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
|
||||
|
||||
ASSERT_TRUE(result.has_value());
|
||||
EXPECT_EQ(result.value(), obj);
|
||||
});
|
||||
}
|
||||
|
||||
TEST_F(ResponseExpirationCacheTest, GetOrUpdateCachedValueNotExpiredDoesNotCallUpdaterOrVerifier)
|
||||
{
|
||||
ResponseExpirationCache cache{std::chrono::seconds(10), {cmd}};
|
||||
|
||||
runSpawn([&](boost::asio::yield_context yield) {
|
||||
// First call to populate cache
|
||||
EXPECT_CALL(mockUpdater, Call)
|
||||
.WillOnce(Return(
|
||||
ResponseExpirationCache::EntryData{
|
||||
.lastUpdated = std::chrono::steady_clock::now(),
|
||||
.response = obj,
|
||||
}
|
||||
));
|
||||
EXPECT_CALL(mockVerifier, Call).WillOnce(Return(true));
|
||||
|
||||
auto result =
|
||||
cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
|
||||
|
||||
ASSERT_TRUE(result.has_value());
|
||||
EXPECT_EQ(result.value(), obj);
|
||||
|
||||
// Second call should use cached value and not call updater/verifier
|
||||
result = cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
|
||||
|
||||
ASSERT_TRUE(result.has_value());
|
||||
EXPECT_EQ(result.value(), obj);
|
||||
});
|
||||
}
|
||||
|
||||
TEST_F(ResponseExpirationCacheTest, GetOrUpdateHandlesErrorFromUpdater)
|
||||
{
|
||||
ResponseExpirationCache cache{std::chrono::seconds(10), {cmd}};
|
||||
|
||||
ResponseExpirationCache::Error const error{
|
||||
.status = rpc::Status{rpc::ClioError::EtlConnectionError}, .warnings = {}
|
||||
};
|
||||
|
||||
runSpawn([&](boost::asio::yield_context yield) {
|
||||
EXPECT_CALL(mockUpdater, Call).WillOnce(Return(std::unexpected(error)));
|
||||
|
||||
auto result =
|
||||
cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
|
||||
|
||||
ASSERT_FALSE(result.has_value());
|
||||
EXPECT_EQ(result.error(), error);
|
||||
});
|
||||
}
|
||||
|
||||
TEST_F(ResponseExpirationCacheTest, GetOrUpdateVerifierRejection)
|
||||
{
|
||||
ResponseExpirationCache cache{std::chrono::seconds(10), {cmd}};
|
||||
|
||||
runSpawn([&](boost::asio::yield_context yield) {
|
||||
EXPECT_CALL(mockUpdater, Call)
|
||||
.WillOnce(Return(
|
||||
ResponseExpirationCache::EntryData{
|
||||
.lastUpdated = std::chrono::steady_clock::now(),
|
||||
.response = obj,
|
||||
}
|
||||
));
|
||||
EXPECT_CALL(mockVerifier, Call).WillOnce(Return(false));
|
||||
|
||||
auto result =
|
||||
cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
|
||||
|
||||
ASSERT_TRUE(result.has_value());
|
||||
EXPECT_EQ(result.value(), obj);
|
||||
|
||||
boost::json::object const anotherObj = {{"some key", "another value"}};
|
||||
EXPECT_CALL(mockUpdater, Call)
|
||||
.WillOnce(Return(
|
||||
ResponseExpirationCache::EntryData{
|
||||
.lastUpdated = std::chrono::steady_clock::now(),
|
||||
.response = anotherObj,
|
||||
}
|
||||
));
|
||||
EXPECT_CALL(mockVerifier, Call).WillOnce(Return(true));
|
||||
|
||||
result = cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
|
||||
|
||||
ASSERT_TRUE(result.has_value());
|
||||
EXPECT_EQ(result.value(), anotherObj);
|
||||
});
|
||||
}
|
||||
|
||||
TEST_F(ResponseExpirationCacheTest, GetOrUpdateMultipleConcurrentUpdates)
|
||||
{
|
||||
ResponseExpirationCache cache{std::chrono::seconds(10), {cmd}};
|
||||
bool waitingCoroutineFinished = false;
|
||||
|
||||
auto waitingCoroutine = [&](boost::asio::yield_context yield) {
|
||||
auto result =
|
||||
cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
|
||||
|
||||
ASSERT_TRUE(result.has_value());
|
||||
EXPECT_EQ(result.value(), obj);
|
||||
waitingCoroutineFinished = true;
|
||||
};
|
||||
|
||||
EXPECT_CALL(mockUpdater, Call)
|
||||
.WillOnce(
|
||||
[this, &waitingCoroutine](
|
||||
boost::asio::yield_context yield
|
||||
) -> std::expected<ResponseExpirationCache::EntryData, ResponseExpirationCache::Error> {
|
||||
boost::asio::spawn(yield, waitingCoroutine);
|
||||
return ResponseExpirationCache::EntryData{
|
||||
.lastUpdated = std::chrono::steady_clock::now(),
|
||||
.response = obj,
|
||||
};
|
||||
}
|
||||
);
|
||||
EXPECT_CALL(mockVerifier, Call).WillOnce(Return(true));
|
||||
|
||||
runSpawnWithTimeout(std::chrono::seconds{1}, [&](boost::asio::yield_context yield) {
|
||||
auto result =
|
||||
cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
|
||||
|
||||
ASSERT_TRUE(result.has_value());
|
||||
EXPECT_EQ(result.value(), obj);
|
||||
ASSERT_FALSE(waitingCoroutineFinished);
|
||||
});
|
||||
}
|
||||
|
||||
TEST_F(ResponseExpirationCacheTest, InvalidateForcesRefresh)
|
||||
{
|
||||
ResponseExpirationCache cache{std::chrono::seconds(10), {cmd}};
|
||||
|
||||
runSpawn([&](boost::asio::yield_context yield) {
|
||||
boost::json::object const oldObject = {{"some key", "old value"}};
|
||||
EXPECT_CALL(mockUpdater, Call)
|
||||
.WillOnce(Return(
|
||||
ResponseExpirationCache::EntryData{
|
||||
.lastUpdated = std::chrono::steady_clock::now(),
|
||||
.response = oldObject,
|
||||
}
|
||||
));
|
||||
EXPECT_CALL(mockVerifier, Call).WillOnce(Return(true));
|
||||
|
||||
auto result =
|
||||
cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
|
||||
|
||||
ASSERT_TRUE(result.has_value());
|
||||
EXPECT_EQ(result.value(), oldObject);
|
||||
|
||||
cache.invalidate();
|
||||
|
||||
EXPECT_CALL(mockUpdater, Call)
|
||||
.WillOnce(Return(
|
||||
ResponseExpirationCache::EntryData{
|
||||
.lastUpdated = std::chrono::steady_clock::now(),
|
||||
.response = obj,
|
||||
}
|
||||
));
|
||||
EXPECT_CALL(mockVerifier, Call).WillOnce(Return(true));
|
||||
|
||||
result = cache.getOrUpdate(yield, "server_info", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction());
|
||||
|
||||
ASSERT_TRUE(result.has_value());
|
||||
EXPECT_EQ(result.value(), obj);
|
||||
});
|
||||
}
|
||||
|
||||
struct ResponseExpirationCacheAssertTest : common::util::WithMockAssert, ResponseExpirationCacheTest {};
|
||||
|
||||
TEST_F(ResponseExpirationCacheAssertTest, NonCacheableCommandThrowsAssertion)
|
||||
{
|
||||
ResponseExpirationCache cache{std::chrono::seconds(10), {cmd}};
|
||||
|
||||
ASSERT_FALSE(cache.shouldCache("non_cacheable_command"));
|
||||
|
||||
runSpawn([&](boost::asio::yield_context yield) {
|
||||
EXPECT_CLIO_ASSERT_FAIL({
|
||||
[[maybe_unused]]
|
||||
auto const v = cache.getOrUpdate(
|
||||
yield, "non_cacheable_command", mockUpdater.AsStdFunction(), mockVerifier.AsStdFunction()
|
||||
);
|
||||
});
|
||||
});
|
||||
cache.put("key", response);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds{2});
|
||||
|
||||
auto const result = cache.get("key");
|
||||
EXPECT_FALSE(result);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user