Add forwarding cache (#1204)

Fixes #51.
This commit is contained in:
Sergey Kuznetsov
2024-03-05 18:09:29 +00:00
committed by GitHub
parent dc8d1658e3
commit 58a1833cf2
21 changed files with 605 additions and 29 deletions

1
.gitignore vendored
View File

@@ -5,6 +5,7 @@
.cache
.vscode
.python-version
.DS_Store
CMakeUserPresets.json
config.json
src/main/impl/Build.cpp

View File

@@ -102,6 +102,7 @@ target_sources(
src/etl/LoadBalancer.cpp
src/etl/CacheLoaderSettings.cpp
src/etl/Source.cpp
src/etl/impl/ForwardingCache.cpp
src/etl/impl/ForwardingSource.cpp
src/etl/impl/GrpcSource.cpp
src/etl/impl/SubscriptionSource.cpp
@@ -191,28 +192,51 @@ if (tests)
# Common
unittests/ConfigTests.cpp
unittests/data/BackendCountersTests.cpp
unittests/data/BackendCountersTests.cpp
unittests/data/BackendFactoryTests.cpp
unittests/data/BackendFactoryTests.cpp
unittests/data/cassandra/AsyncExecutorTests.cpp
# Webserver
unittests/data/cassandra/AsyncExecutorTests.cpp
# Webserver
unittests/data/cassandra/BackendTests.cpp
unittests/data/cassandra/BackendTests.cpp
unittests/data/cassandra/BaseTests.cpp
unittests/data/cassandra/BaseTests.cpp
unittests/data/cassandra/ExecutionStrategyTests.cpp
unittests/data/cassandra/ExecutionStrategyTests.cpp
unittests/data/cassandra/RetryPolicyTests.cpp
unittests/data/cassandra/RetryPolicyTests.cpp
unittests/data/cassandra/SettingsProviderTests.cpp
unittests/data/cassandra/SettingsProviderTests.cpp
unittests/DOSGuardTests.cpp
unittests/etl/AmendmentBlockHandlerTests.cpp
unittests/etl/AmendmentBlockHandlerTests.cpp
unittests/etl/CacheLoaderSettingsTests.cpp
unittests/etl/CacheLoaderTests.cpp
unittests/etl/CacheLoaderTests.cpp
unittests/etl/CursorProviderTests.cpp
unittests/etl/ETLStateTests.cpp
unittests/etl/ETLStateTests.cpp
unittests/etl/ExtractionDataPipeTests.cpp
unittests/etl/ExtractionDataPipeTests.cpp
unittests/etl/ExtractorTests.cpp
unittests/etl/ExtractorTests.cpp
unittests/etl/ForwardingCacheTests.cpp
unittests/etl/ForwardingSourceTests.cpp
unittests/etl/ForwardingSourceTests.cpp
unittests/etl/GrpcSourceTests.cpp
unittests/etl/GrpcSourceTests.cpp
unittests/etl/LedgerPublisherTests.cpp
unittests/etl/LedgerPublisherTests.cpp
unittests/etl/SourceTests.cpp
unittests/etl/SourceTests.cpp
unittests/etl/SubscriptionSourceDependenciesTests.cpp
unittests/etl/SubscriptionSourceDependenciesTests.cpp
unittests/etl/SubscriptionSourceTests.cpp
unittests/etl/SubscriptionSourceTests.cpp
unittests/etl/TransformerTests.cpp
# RPC
unittests/etl/TransformerTests.cpp
# RPC
unittests/feed/BookChangesFeedTests.cpp
@@ -229,48 +253,92 @@ if (tests)
unittests/Playground.cpp
unittests/ProfilerTests.cpp
unittests/rpc/AmendmentsTests.cpp
unittests/rpc/AmendmentsTests.cpp
unittests/rpc/APIVersionTests.cpp
unittests/rpc/APIVersionTests.cpp
unittests/rpc/BaseTests.cpp
unittests/rpc/BaseTests.cpp
unittests/rpc/CountersTests.cpp
unittests/rpc/CountersTests.cpp
unittests/rpc/ErrorTests.cpp
unittests/rpc/ErrorTests.cpp
unittests/rpc/ForwardingProxyTests.cpp
unittests/rpc/ForwardingProxyTests.cpp
unittests/rpc/handlers/AccountChannelsTests.cpp
unittests/rpc/handlers/AccountChannelsTests.cpp
unittests/rpc/handlers/AccountCurrenciesTests.cpp
unittests/rpc/handlers/AccountCurrenciesTests.cpp
unittests/rpc/handlers/AccountInfoTests.cpp
unittests/rpc/handlers/AccountInfoTests.cpp
unittests/rpc/handlers/AccountLinesTests.cpp
unittests/rpc/handlers/AccountLinesTests.cpp
unittests/rpc/handlers/AccountNFTsTests.cpp
unittests/rpc/handlers/AccountNFTsTests.cpp
unittests/rpc/handlers/AccountObjectsTests.cpp
unittests/rpc/handlers/AccountObjectsTests.cpp
unittests/rpc/handlers/AccountOffersTests.cpp
unittests/rpc/handlers/AccountOffersTests.cpp
unittests/rpc/handlers/AccountTxTests.cpp
unittests/rpc/handlers/AccountTxTests.cpp
unittests/rpc/handlers/AMMInfoTests.cpp
# Backend
unittests/rpc/handlers/AMMInfoTests.cpp
# Backend
unittests/rpc/handlers/BookChangesTests.cpp
unittests/rpc/handlers/BookChangesTests.cpp
unittests/rpc/handlers/BookOffersTests.cpp
unittests/rpc/handlers/BookOffersTests.cpp
unittests/rpc/handlers/DefaultProcessorTests.cpp
unittests/rpc/handlers/DefaultProcessorTests.cpp
unittests/rpc/handlers/DepositAuthorizedTests.cpp
unittests/rpc/handlers/DepositAuthorizedTests.cpp
unittests/rpc/handlers/GatewayBalancesTests.cpp
unittests/rpc/handlers/GatewayBalancesTests.cpp
unittests/rpc/handlers/LedgerDataTests.cpp
unittests/rpc/handlers/LedgerDataTests.cpp
unittests/rpc/handlers/LedgerEntryTests.cpp
unittests/rpc/handlers/LedgerEntryTests.cpp
unittests/rpc/handlers/LedgerRangeTests.cpp
unittests/rpc/handlers/LedgerRangeTests.cpp
unittests/rpc/handlers/LedgerTests.cpp
unittests/rpc/handlers/LedgerTests.cpp
unittests/rpc/handlers/NFTBuyOffersTests.cpp
unittests/rpc/handlers/NFTBuyOffersTests.cpp
unittests/rpc/handlers/NFTHistoryTests.cpp
unittests/rpc/handlers/NFTHistoryTests.cpp
unittests/rpc/handlers/NFTInfoTests.cpp
unittests/rpc/handlers/NFTInfoTests.cpp
unittests/rpc/handlers/NFTsByIssuerTest.cpp
unittests/rpc/handlers/NFTsByIssuerTest.cpp
unittests/rpc/handlers/NFTSellOffersTests.cpp
unittests/rpc/handlers/NFTSellOffersTests.cpp
unittests/rpc/handlers/NoRippleCheckTests.cpp
unittests/rpc/handlers/NoRippleCheckTests.cpp
unittests/rpc/handlers/PingTests.cpp
unittests/rpc/handlers/PingTests.cpp
unittests/rpc/handlers/RandomTests.cpp
unittests/rpc/handlers/RandomTests.cpp
unittests/rpc/handlers/ServerInfoTests.cpp
unittests/rpc/handlers/ServerInfoTests.cpp
unittests/rpc/handlers/SubscribeTests.cpp
unittests/rpc/handlers/SubscribeTests.cpp
unittests/rpc/handlers/TestHandlerTests.cpp
unittests/rpc/handlers/TestHandlerTests.cpp
unittests/rpc/handlers/TransactionEntryTests.cpp
unittests/rpc/handlers/TransactionEntryTests.cpp
unittests/rpc/handlers/TxTests.cpp
unittests/rpc/handlers/TxTests.cpp
unittests/rpc/handlers/UnsubscribeTests.cpp
unittests/rpc/handlers/UnsubscribeTests.cpp
unittests/rpc/handlers/VersionHandlerTests.cpp
unittests/rpc/handlers/VersionHandlerTests.cpp
unittests/rpc/JsonBoolTests.cpp
# RPC handlers
unittests/rpc/JsonBoolTests.cpp
# RPC handlers
unittests/rpc/RPCHelpersTests.cpp
unittests/rpc/RPCHelpersTests.cpp
unittests/rpc/WorkQueueTests.cpp
unittests/rpc/WorkQueueTests.cpp
unittests/util/AssertTests.cpp
unittests/util/async/AnyExecutionContextTests.cpp

View File

@@ -83,3 +83,17 @@ By default Clio checks admin privileges by IP address from requests (only `127.0
If the password is presented in the config, Clio will check the Authorization header (if any) in each request for the password. The Authorization header should contain the type `Password`, and the password from the config (e.g. `Password secret`).
Exactly equal password gains admin rights for the request or a websocket connection.
## ETL sources forwarding cache
Clio can cache requests to ETL sources to reduce the load on the ETL source.
Only following commands are cached: `server_info`, `server_state`, `server_definitions`, `fee`, `ledger_closed`.
By default the forwarding cache is off.
To enable the caching for a source, `forwarding_cache_timeout` value should be added to the configuration file, e.g.:
```json
"forwarding_cache_timeout": 0.250,
```
`forwarding_cache_timeout` defines for how long (in seconds) a cache entry will be valid after being placed into the cache.
Zero value turns off the cache feature.

View File

@@ -35,6 +35,7 @@
"grpc_port": "50051"
}
],
"forwarding_cache_timeout": 0.250, // in seconds, could be 0, which means no cache
"dos_guard": {
// Comma-separated list of IPs to exclude from rate limiting
"whitelist": [

View File

@@ -44,7 +44,7 @@ struct ETLState {
*/
template <typename Forward>
static std::optional<ETLState>
fetchETLStateFromSource(Forward const& source) noexcept
fetchETLStateFromSource(Forward& source) noexcept
{
auto const serverInfoRippled = data::synchronous([&source](auto yield) {
return source.forwardToRippled({{"command", "server_info"}}, std::nullopt, yield);

View File

@@ -70,6 +70,13 @@ LoadBalancer::LoadBalancer(
std::shared_ptr<NetworkValidatedLedgers> validatedLedgers
)
{
auto const forwardingCacheTimeout = config.valueOr<float>("forwarding_cache_timeout", 0.f);
if (forwardingCacheTimeout > 0.f) {
forwardingCache_ = impl::ForwardingCache{
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::duration<float>{forwardingCacheTimeout})
};
}
static constexpr std::uint32_t MAX_DOWNLOAD = 256;
if (auto value = config.maybeValue<uint32_t>("num_markers"); value) {
downloadRanges_ = std::clamp(*value, 1u, MAX_DOWNLOAD);
@@ -99,7 +106,8 @@ LoadBalancer::LoadBalancer(
if (not hasForwardingSource_)
chooseForwardingSource();
},
[this]() { chooseForwardingSource(); }
[this]() { chooseForwardingSource(); },
[this]() { forwardingCache_->invalidate(); }
);
// checking etl node validity
@@ -193,22 +201,34 @@ LoadBalancer::forwardToRippled(
boost::json::object const& request,
std::optional<std::string> const& clientIp,
boost::asio::yield_context yield
) const
)
{
if (forwardingCache_) {
if (auto cachedResponse = forwardingCache_->get(request); cachedResponse) {
return cachedResponse;
}
}
std::size_t sourceIdx = 0;
if (!sources_.empty())
sourceIdx = util::Random::uniform(0ul, sources_.size() - 1);
auto numAttempts = 0u;
std::optional<boost::json::object> response;
while (numAttempts < sources_.size()) {
if (auto res = sources_[sourceIdx].forwardToRippled(request, clientIp, yield))
return res;
if (auto res = sources_[sourceIdx].forwardToRippled(request, clientIp, yield)) {
response = std::move(res);
break;
}
sourceIdx = (sourceIdx + 1) % sources_.size();
++numAttempts;
}
if (response and forwardingCache_ and not response->contains("error"))
forwardingCache_->put(request, *response);
return {};
}

View File

@@ -23,6 +23,7 @@
#include "etl/ETLHelpers.hpp"
#include "etl/ETLState.hpp"
#include "etl/Source.hpp"
#include "etl/impl/ForwardingCache.hpp"
#include "feed/SubscriptionManager.hpp"
#include "util/config/Config.hpp"
#include "util/log/Logger.hpp"
@@ -72,10 +73,12 @@ private:
static constexpr std::uint32_t DEFAULT_DOWNLOAD_RANGES = 16;
util::Logger log_{"ETL"};
// Forwarding cache must be destroyed after sources because sources have a callnack to invalidate cache
std::optional<impl::ForwardingCache> forwardingCache_;
std::vector<Source> sources_;
std::optional<ETLState> etlState_;
std::uint32_t downloadRanges_ =
DEFAULT_DOWNLOAD_RANGES; /*< The number of markers to use when downloading intial ledger */
DEFAULT_DOWNLOAD_RANGES; /*< The number of markers to use when downloading initial ledger */
std::atomic_bool hasForwardingSource_{false};
public:
@@ -164,7 +167,7 @@ public:
boost::json::object const& request,
std::optional<std::string> const& clientIp,
boost::asio::yield_context yield
) const;
);
/**
* @brief Return state of ETL nodes.

View File

@@ -26,7 +26,9 @@
#include <boost/asio/io_context.hpp>
#include <chrono>
#include <memory>
#include <optional>
#include <string>
#include <utility>
@@ -42,13 +44,15 @@ make_Source(
std::shared_ptr<feed::SubscriptionManager> subscriptions,
std::shared_ptr<NetworkValidatedLedgers> validatedLedgers,
Source::OnDisconnectHook onDisconnect,
Source::OnConnectHook onConnect
Source::OnConnectHook onConnect,
Source::OnLedgerClosedHook onLedgerClosed
)
{
auto const ip = config.valueOr<std::string>("ip", {});
auto const wsPort = config.valueOr<std::string>("ws_port", {});
auto const grpcPort = config.valueOr<std::string>("grpc_port", {});
impl::ForwardingSource forwardingSource{ip, wsPort};
impl::GrpcSource grpcSource{ip, grpcPort, std::move(backend)};
auto subscriptionSource = std::make_unique<impl::SubscriptionSource>(
ioc,
@@ -57,9 +61,9 @@ make_Source(
std::move(validatedLedgers),
std::move(subscriptions),
std::move(onConnect),
std::move(onDisconnect)
std::move(onDisconnect),
std::move(onLedgerClosed)
);
impl::ForwardingSource forwardingSource{ip, wsPort};
return Source{
ip, wsPort, grpcPort, std::move(grpcSource), std::move(subscriptionSource), std::move(forwardingSource)

View File

@@ -68,6 +68,7 @@ class SourceImpl {
public:
using OnConnectHook = impl::SubscriptionSource::OnConnectHook;
using OnDisconnectHook = impl::SubscriptionSource::OnDisconnectHook;
using OnLedgerClosedHook = impl::SubscriptionSource::OnLedgerClosedHook;
/**
* @brief Construct a new SourceImpl object
@@ -80,7 +81,7 @@ public:
* @param forwardingSource The forwarding source
*/
template <typename SomeGrpcSourceType, typename SomeForwardingSourceType>
requires std::is_same_v<GrpcSourceType, SomeGrpcSourceType> &&
requires std::is_same_v<GrpcSourceType, SomeGrpcSourceType> and
std::is_same_v<ForwardingSourceType, SomeForwardingSourceType>
SourceImpl(
std::string ip,
@@ -250,7 +251,8 @@ make_Source(
std::shared_ptr<feed::SubscriptionManager> subscriptions,
std::shared_ptr<NetworkValidatedLedgers> validatedLedgers,
Source::OnDisconnectHook onDisconnect,
Source::OnConnectHook onConnect
Source::OnConnectHook onConnect,
Source::OnLedgerClosedHook onLedgerClosed
);
} // namespace etl

View File

@@ -0,0 +1,134 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "etl/impl/ForwardingCache.hpp"
#include "util/Assert.hpp"
#include <boost/json/object.hpp>
#include <boost/json/value_to.hpp>
#include <chrono>
#include <mutex>
#include <optional>
#include <shared_mutex>
#include <string>
#include <unordered_set>
#include <utility>
namespace etl::impl {
namespace {
std::optional<std::string>
getCommand(boost::json::object const& request)
{
if (not request.contains("command")) {
return std::nullopt;
}
return boost::json::value_to<std::string>(request.at("command"));
}
} // namespace
void
CacheEntry::put(boost::json::object response)
{
response_ = std::move(response);
lastUpdated_ = std::chrono::steady_clock::now();
}
std::optional<boost::json::object>
CacheEntry::get() const
{
return response_;
}
std::chrono::steady_clock::time_point
CacheEntry::lastUpdated() const
{
return lastUpdated_;
}
void
CacheEntry::invalidate()
{
response_.reset();
}
std::unordered_set<std::string> const
ForwardingCache::CACHEABLE_COMMANDS{"server_info", "server_state", "server_definitions", "fee", "ledger_closed"};
ForwardingCache::ForwardingCache(std::chrono::steady_clock::duration const cacheTimeout) : cacheTimeout_{cacheTimeout}
{
for (auto const& command : CACHEABLE_COMMANDS) {
cache_.emplace(command, CacheEntry{});
}
}
bool
ForwardingCache::shouldCache(boost::json::object const& request)
{
auto const command = getCommand(request);
return command.has_value() and CACHEABLE_COMMANDS.contains(*command);
}
std::optional<boost::json::object>
ForwardingCache::get(boost::json::object const& request) const
{
auto const command = getCommand(request);
if (not command.has_value()) {
return std::nullopt;
}
auto it = cache_.find(*command);
if (it == cache_.end())
return std::nullopt;
auto const& entry = it->second.lock<std::shared_lock>();
if (std::chrono::steady_clock::now() - entry->lastUpdated() > cacheTimeout_)
return std::nullopt;
return entry->get();
}
void
ForwardingCache::put(boost::json::object const& request, boost::json::object const& response)
{
auto const command = getCommand(request);
if (not command.has_value() or not shouldCache(request))
return;
ASSERT(cache_.contains(*command), "Command is not in the cache: {}", *command);
auto entry = cache_[*command].lock<std::unique_lock>();
entry->put(response);
}
void
ForwardingCache::invalidate()
{
for (auto& [_, entry] : cache_) {
auto entryLock = entry.lock<std::unique_lock>();
entryLock->invalidate();
}
}
} // namespace etl::impl

View File

@@ -0,0 +1,125 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "util/Mutex.hpp"
#include <boost/json/object.hpp>
#include <chrono>
#include <optional>
#include <shared_mutex>
#include <string>
#include <unordered_map>
#include <unordered_set>
namespace etl::impl {
/**
* @brief A class to store a cache entry.
*/
class CacheEntry {
std::chrono::steady_clock::time_point lastUpdated_;
std::optional<boost::json::object> response_;
public:
/**
* @brief Put a response into the cache
*
* @param response The response to store
*/
void
put(boost::json::object response);
/**
* @brief Get the response from the cache
*
* @return The response
*/
std::optional<boost::json::object>
get() const;
/**
* @brief Get the last time the cache was updated
*
* @return The last time the cache was updated
*/
std::chrono::steady_clock::time_point
lastUpdated() const;
/**
* @brief Invalidate the cache entry
*/
void
invalidate();
};
/**
* @brief A class to store a cache of forwarding responses
*/
class ForwardingCache {
std::chrono::steady_clock::duration cacheTimeout_;
std::unordered_map<std::string, util::Mutex<CacheEntry, std::shared_mutex>> cache_;
public:
static std::unordered_set<std::string> const CACHEABLE_COMMANDS;
/**
* @brief Construct a new Forwarding Cache object
*
* @param cacheTimeout The time for cache entries to expire
*/
ForwardingCache(std::chrono::steady_clock::duration cacheTimeout);
/**
* @brief Check if a request should be cached
*
* @param request The request to check
* @return true if the request should be cached and false otherwise
*/
[[nodiscard]] static bool
shouldCache(boost::json::object const& request);
/**
* @brief Get a response from the cache
*
* @param request The request to get the response for
* @return The response if it exists or std::nullopt otherwise
*/
[[nodiscard]] std::optional<boost::json::object>
get(boost::json::object const& request) const;
/**
* @brief Put a response into the cache if the request should be cached
*
* @param request The request to store the response for
* @param response The response to store
*/
void
put(boost::json::object const& request, boost::json::object const& response);
/**
* @brief Invalidate all entries in the cache
*/
void
invalidate();
};
} // namespace etl::impl

View File

@@ -92,6 +92,7 @@ ForwardingSource::forwardToRippled(
auto responseObject = parsedResponse.as_object();
responseObject["forwarded"] = true;
return responseObject;
}

View File

@@ -34,6 +34,7 @@ namespace etl::impl {
class ForwardingSource {
util::Logger log_;
util::requests::WsConnectionBuilder connectionBuilder_;
static constexpr std::chrono::seconds CONNECTION_TIMEOUT{3};
public:

View File

@@ -197,6 +197,8 @@ SubscriptionSource::handleMessage(std::string const& message)
auto validatedLedgers = boost::json::value_to<std::string>(object.at(JS(validated_ledgers)));
setValidatedRange(std::move(validatedLedgers));
}
if (isForwarding_)
onLedgerClosed_();
} else {
if (isForwarding_) {

View File

@@ -52,6 +52,7 @@ class SubscriptionSource {
public:
using OnConnectHook = std::function<void()>;
using OnDisconnectHook = std::function<void()>;
using OnLedgerClosedHook = std::function<void()>;
private:
util::Logger log_;
@@ -72,6 +73,7 @@ private:
OnConnectHook onConnect_;
OnDisconnectHook onDisconnect_;
OnLedgerClosedHook onLedgerClosed_;
std::atomic_bool isConnected_{false};
std::atomic_bool stop_{false};
@@ -97,6 +99,9 @@ public:
* @param validatedLedgers The network validated ledgers object
* @param subscriptions The subscription manager object
* @param onDisconnect The onDisconnect hook. Called when the connection is lost
* @param onNewLedger The onNewLedger hook. Called when a new ledger is received
* @param onLedgerClosed The onLedgerClosed hook. Called when the ledger is closed but only if the source is
* forwarding
* @param connectionTimeout The connection timeout. Defaults to 30 seconds
* @param retryDelay The retry delay. Defaults to 1 second
*/
@@ -109,6 +114,7 @@ public:
std::shared_ptr<SubscriptionManagerType> subscriptions,
OnConnectHook onConnect,
OnDisconnectHook onDisconnect,
OnLedgerClosedHook onLedgerClosed,
std::chrono::steady_clock::duration const connectionTimeout = CONNECTION_TIMEOUT,
std::chrono::steady_clock::duration const retryDelay = RETRY_DELAY
)
@@ -119,6 +125,7 @@ public:
, retry_(util::makeRetryExponentialBackoff(retryDelay, RETRY_MAX_DELAY, strand_))
, onConnect_(std::move(onConnect))
, onDisconnect_(std::move(onDisconnect))
, onLedgerClosed_(std::move(onLedgerClosed))
{
wsConnectionBuilder_.addHeader({boost::beast::http::field::user_agent, "clio-client"})
.addHeader({"X-User", "clio-client"})

View File

@@ -24,17 +24,19 @@
namespace util {
template <typename ProtectedDataType>
template <typename ProtectedDataType, typename MutextType>
class Mutex;
/**
* @brief A lock on a mutex that provides access to the protected data.
*
* @tparam ProtectedDataType data type to hold
* @tparam LockType type of lock
* @tparam MutexType type of mutex
*/
template <typename ProtectedDataType>
template <typename ProtectedDataType, template <typename> typename LockType, typename MutexType>
class Lock {
std::scoped_lock<std::mutex> lock_;
LockType<MutexType> lock_;
ProtectedDataType& data_;
public:
@@ -77,9 +79,9 @@ public:
/** @endcond */
private:
friend class Mutex<std::remove_const_t<ProtectedDataType>>;
friend class Mutex<std::remove_const_t<ProtectedDataType>, MutexType>;
explicit Lock(std::mutex& mutex, ProtectedDataType& data) : lock_(mutex), data_(data)
Lock(MutexType& mutex, ProtectedDataType& data) : lock_(mutex), data_(data)
{
}
};
@@ -88,10 +90,11 @@ private:
* @brief A container for data that is protected by a mutex. Inspired by Mutex in Rust.
*
* @tparam ProtectedDataType data type to hold
* @tparam MutexType type of mutex
*/
template <typename ProtectedDataType>
template <typename ProtectedDataType, typename MutexType = std::mutex>
class Mutex {
mutable std::mutex mutex_;
mutable MutexType mutex_;
ProtectedDataType data_;
public:
@@ -123,23 +126,27 @@ public:
/**
* @brief Lock the mutex and get a lock object allowing access to the protected data
*
* @tparam LockType The type of lock to use
* @return A lock on the mutex and a reference to the protected data
*/
Lock<ProtectedDataType const>
template <template <typename> typename LockType = std::lock_guard>
Lock<ProtectedDataType const, LockType, MutexType>
lock() const
{
return Lock<ProtectedDataType const>{mutex_, data_};
return {mutex_, data_};
}
/**
* @brief Lock the mutex and get a lock object allowing access to the protected data
*
* @tparam LockType The type of lock to use
* @return A lock on the mutex and a reference to the protected data
*/
Lock<ProtectedDataType>
template <template <typename> typename LockType = std::lock_guard>
Lock<ProtectedDataType, LockType, MutexType>
lock()
{
return Lock<ProtectedDataType>{mutex_, data_};
return {mutex_, data_};
}
};

View File

@@ -32,7 +32,7 @@ using namespace util;
using namespace testing;
struct ETLStateTest : public NoLoggerFixture {
MockSource const source = MockSource{};
MockSource source = MockSource{};
};
TEST_F(ETLStateTest, Error)

View File

@@ -0,0 +1,133 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "etl/impl/ForwardingCache.hpp"
#include <boost/json/object.hpp>
#include <gtest/gtest.h>
#include <chrono>
#include <thread>
using namespace etl::impl;
struct CacheEntryTests : public ::testing::Test {
CacheEntry entry_;
boost::json::object const object_ = {{"key", "value"}};
};
TEST_F(CacheEntryTests, PutAndGet)
{
EXPECT_FALSE(entry_.get());
entry_.put(object_);
auto result = entry_.get();
ASSERT_TRUE(result);
EXPECT_EQ(*result, object_);
}
TEST_F(CacheEntryTests, LastUpdated)
{
EXPECT_EQ(entry_.lastUpdated().time_since_epoch().count(), 0);
entry_.put(object_);
auto const lastUpdated = entry_.lastUpdated();
EXPECT_GE(
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - lastUpdated).count(), 0
);
entry_.put(boost::json::object{{"key", "new value"}});
auto const newLastUpdated = entry_.lastUpdated();
EXPECT_GT(newLastUpdated, lastUpdated);
EXPECT_GE(
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - newLastUpdated)
.count(),
0
);
}
TEST_F(CacheEntryTests, Invalidate)
{
entry_.put(object_);
entry_.invalidate();
EXPECT_FALSE(entry_.get());
}
TEST(ForwardingCacheTests, ShouldCache)
{
for (auto const& command : ForwardingCache::CACHEABLE_COMMANDS) {
auto const request = boost::json::object{{"command", command}};
EXPECT_TRUE(ForwardingCache::shouldCache(request));
}
auto const request = boost::json::object{{"command", "tx"}};
EXPECT_FALSE(ForwardingCache::shouldCache(request));
auto const requestWithoutCommand = boost::json::object{{"key", "value"}};
EXPECT_FALSE(ForwardingCache::shouldCache(requestWithoutCommand));
}
TEST(ForwardingCacheTests, Get)
{
ForwardingCache cache{std::chrono::seconds{100}};
auto const request = boost::json::object{{"command", "server_info"}};
auto const response = boost::json::object{{"key", "value"}};
cache.put(request, response);
auto const result = cache.get(request);
ASSERT_TRUE(result);
EXPECT_EQ(*result, response);
}
TEST(ForwardingCacheTests, GetExpired)
{
ForwardingCache cache{std::chrono::milliseconds{1}};
auto const request = boost::json::object{{"command", "server_info"}};
auto const response = boost::json::object{{"key", "value"}};
cache.put(request, response);
std::this_thread::sleep_for(std::chrono::milliseconds{2});
auto const result = cache.get(request);
EXPECT_FALSE(result);
}
TEST(ForwardingCacheTests, GetAndPutNotCommand)
{
ForwardingCache cache{std::chrono::seconds{100}};
auto const request = boost::json::object{{"key", "value"}};
auto const response = boost::json::object{{"key", "value"}};
cache.put(request, response);
auto const result = cache.get(request);
EXPECT_FALSE(result);
}
TEST(ForwardingCache, Invalidate)
{
ForwardingCache cache{std::chrono::seconds{100}};
auto const request = boost::json::object{{"command", "server_info"}};
auto const response = boost::json::object{{"key", "value"}};
cache.put(request, response);
cache.invalidate();
EXPECT_FALSE(cache.get(request));
}

View File

@@ -28,6 +28,7 @@
#include <gtest/gtest.h>
#include <chrono>
#include <optional>
#include <string>
#include <utility>
@@ -47,7 +48,7 @@ TEST_F(ForwardingSourceTests, ConnectionFailed)
}
struct ForwardingSourceOperationsTests : ForwardingSourceTests {
std::string const message_ = R"({"data":"some_data"})";
std::string const message_ = R"({"data": "some_data"})";
boost::json::object const reply_ = {{"reply", "some_reply"}};
TestWsConnection
@@ -83,7 +84,7 @@ TEST_F(ForwardingSourceOperationsTests, ParseFailed)
auto receivedMessage = connection.receive(yield);
[&]() { ASSERT_TRUE(receivedMessage); }();
EXPECT_EQ(*receivedMessage, message_);
EXPECT_EQ(boost::json::parse(*receivedMessage), boost::json::parse(message_)) << *receivedMessage;
auto sendError = connection.send("invalid_json", yield);
[&]() { ASSERT_FALSE(sendError) << *sendError; }();
@@ -97,6 +98,28 @@ TEST_F(ForwardingSourceOperationsTests, ParseFailed)
});
}
TEST_F(ForwardingSourceOperationsTests, GotNotAnObject)
{
boost::asio::spawn(ctx, [&](boost::asio::yield_context yield) {
auto connection = serverConnection(yield);
auto receivedMessage = connection.receive(yield);
[&]() { ASSERT_TRUE(receivedMessage); }();
EXPECT_EQ(boost::json::parse(*receivedMessage), boost::json::parse(message_)) << *receivedMessage;
auto sendError = connection.send(R"(["some_value"])", yield);
[&]() { ASSERT_FALSE(sendError) << *sendError; }();
connection.close(yield);
});
runSpawn([&](boost::asio::yield_context yield) {
auto result = forwardingSource.forwardToRippled(boost::json::parse(message_).as_object(), {}, yield);
EXPECT_FALSE(result);
});
}
TEST_F(ForwardingSourceOperationsTests, Success)
{
boost::asio::spawn(ctx, [&](boost::asio::yield_context yield) {
@@ -104,7 +127,7 @@ TEST_F(ForwardingSourceOperationsTests, Success)
auto receivedMessage = connection.receive(yield);
[&]() { ASSERT_TRUE(receivedMessage); }();
EXPECT_EQ(*receivedMessage, message_);
EXPECT_EQ(boost::json::parse(*receivedMessage), boost::json::parse(message_)) << *receivedMessage;
auto sendError = connection.send(boost::json::serialize(reply_), yield);
[&]() { ASSERT_FALSE(sendError) << *sendError; }();

View File

@@ -67,7 +67,8 @@ struct ForwardingSourceMock {
MOCK_METHOD(
ForwardToRippledReturnType,
forwardToRippled,
(boost::json::object const&, ClientIpOpt const&, boost::asio::yield_context)
(boost::json::object const&, ClientIpOpt const&, boost::asio::yield_context),
(const)
);
};

View File

@@ -60,6 +60,7 @@ struct SubscriptionSourceConnectionTests : public NoLoggerFixture {
StrictMock<MockFunction<void()>> onConnectHook_;
StrictMock<MockFunction<void()>> onDisconnectHook_;
StrictMock<MockFunction<void()>> onLedgerClosedHook_;
std::unique_ptr<SubscriptionSource> subscriptionSource_ = std::make_unique<SubscriptionSource>(
ioContext_,
@@ -69,6 +70,7 @@ struct SubscriptionSourceConnectionTests : public NoLoggerFixture {
subscriptionManager_,
onConnectHook_.AsStdFunction(),
onDisconnectHook_.AsStdFunction(),
onLedgerClosedHook_.AsStdFunction(),
std::chrono::milliseconds(1),
std::chrono::milliseconds(1)
);
@@ -299,10 +301,37 @@ TEST_F(SubscriptionSourceReadTests, GotResultWithLedgerIndexAndValidatedLedgers)
EXPECT_FALSE(subscriptionSource_->hasLedger(4));
}
TEST_F(SubscriptionSourceReadTests, GotLedgerClosed)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"type":"ledgerClosed"})", yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_->stop(); });
ioContext_.run();
}
TEST_F(SubscriptionSourceReadTests, GotLedgerClosedForwardingIsSet)
{
subscriptionSource_->setForwarding(true);
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"type": "ledgerClosed"})", yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onLedgerClosedHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_->stop(); });
ioContext_.run();
}
TEST_F(SubscriptionSourceReadTests, GotLedgerClosedWithLedgerIndex)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"type":"ledgerClosed","ledger_index":123})", yield);
auto connection = connectAndSendMessage(R"({"type": "ledgerClosed","ledger_index": 123})", yield);
connection.close(yield);
});