cache commands that dont take parameters (#153)

* Adds a forwardCache to each ETLSource which allows operators to specify which commands (that don't require parameters) they want to cache.
This commit is contained in:
Nathan Nichols
2022-06-15 16:18:25 -05:00
committed by GitHub
parent b7ae6a0495
commit 166ff63dbc
3 changed files with 164 additions and 16 deletions

View File

@@ -11,6 +11,64 @@
#include <etl/ReportingETL.h>
#include <thread>
void
ForwardCache::freshen()
{
BOOST_LOG_TRIVIAL(trace) << "Freshening ForwardCache";
auto numOutstanding =
std::make_shared<std::atomic_uint>(latestForwarded_.size());
for (auto const& cacheEntry : latestForwarded_)
{
boost::asio::spawn(
strand_,
[this, numOutstanding, command = cacheEntry.first](
boost::asio::yield_context yield) {
boost::json::object request = {{"command", command}};
auto resp = source_.requestFromRippled(request, {}, yield);
if (!resp || resp->contains("error"))
resp = {};
{
std::unique_lock lk(mtx_);
latestForwarded_[command] = resp;
}
});
}
}
void
ForwardCache::clear()
{
std::unique_lock lk(mtx_);
for (auto& cacheEntry : latestForwarded_)
latestForwarded_[cacheEntry.first] = {};
}
std::optional<boost::json::object>
ForwardCache::get(boost::json::object const& request) const
{
std::optional<std::string> command = {};
if (request.contains("command") && !request.contains("method") &&
request.at("command").is_string())
command = request.at("command").as_string().c_str();
else if (
request.contains("method") && !request.contains("command") &&
request.at("method").is_string())
command = request.at("method").as_string().c_str();
if (!command)
return {};
std::shared_lock lk(mtx_);
if (!latestForwarded_.contains(*command))
return {};
return {latestForwarded_.at(*command)};
}
// Create ETL source without grpc endpoint
// Fetch ledger and load initial ledger will fail for this source
// Primarly used in read-only mode, to monitor when ledgers are validated
@@ -27,6 +85,7 @@ ETLSourceImpl<Derived>::ETLSourceImpl(
, backend_(backend)
, subscriptions_(subscriptions)
, balancer_(balancer)
, forwardCache_(config, ioContext, *this)
, ioc_(ioContext)
, timer_(ioContext)
{
@@ -245,11 +304,9 @@ PlainETLSource::onConnect(
boost::beast::websocket::stream_base::decorator(
[](boost::beast::websocket::request_type& req) {
req.set(
boost::beast::http::field::user_agent,
std::string(BOOST_BEAST_VERSION_STRING) +
" clio-client");
boost::beast::http::field::user_agent, "clio-client");
req.set("X-User", "coro-client");
req.set("X-User", "clio-client");
}));
// Update the host_ string. This will provide the value of the
@@ -291,11 +348,9 @@ SslETLSource::onConnect(
boost::beast::websocket::stream_base::decorator(
[](boost::beast::websocket::request_type& req) {
req.set(
boost::beast::http::field::user_agent,
std::string(BOOST_BEAST_VERSION_STRING) +
" clio-client");
boost::beast::http::field::user_agent, "clio-client");
req.set("X-User", "coro-client");
req.set("X-User", "clio-client");
}));
// Update the host_ string. This will provide the value of the
@@ -475,6 +530,7 @@ ETLSourceImpl<Derived>::handleMessage()
{
if (response.contains("transaction"))
{
forwardCache_.freshen();
subscriptions_->forwardProposedTransaction(response);
}
else if (
@@ -1026,7 +1082,23 @@ ETLSourceImpl<Derived>::forwardToRippled(
std::string const& clientIp,
boost::asio::yield_context& yield) const
{
BOOST_LOG_TRIVIAL(debug) << "Attempting to forward request to tx. "
if (auto resp = forwardCache_.get(request); resp)
{
BOOST_LOG_TRIVIAL(debug) << "request hit forwardCache";
return resp;
}
return requestFromRippled(request, clientIp, yield);
}
template <class Derived>
std::optional<boost::json::object>
ETLSourceImpl<Derived>::requestFromRippled(
boost::json::object const& request,
std::string const& clientIp,
boost::asio::yield_context& yield) const
{
BOOST_LOG_TRIVIAL(trace) << "Attempting to forward request to tx. "
<< "request = " << boost::json::serialize(request);
boost::json::object response;
@@ -1047,7 +1119,7 @@ ETLSourceImpl<Derived>::forwardToRippled(
// These objects perform our I/O
tcp::resolver resolver{ioc_};
BOOST_LOG_TRIVIAL(debug) << "Creating websocket";
BOOST_LOG_TRIVIAL(trace) << "Creating websocket";
auto ws = std::make_unique<websocket::stream<beast::tcp_stream>>(ioc_);
// Look up the domain name
@@ -1057,7 +1129,7 @@ ETLSourceImpl<Derived>::forwardToRippled(
ws->next_layer().expires_after(std::chrono::seconds(3));
BOOST_LOG_TRIVIAL(debug) << "Connecting websocket";
BOOST_LOG_TRIVIAL(trace) << "Connecting websocket";
// Make the connection on the IP address we get from a lookup
ws->next_layer().async_connect(results, yield[ec]);
if (ec)
@@ -1076,15 +1148,15 @@ ETLSourceImpl<Derived>::forwardToRippled(
" websocket-client-coro");
req.set(http::field::forwarded, "for=" + clientIp);
}));
BOOST_LOG_TRIVIAL(debug) << "client ip: " << clientIp;
BOOST_LOG_TRIVIAL(trace) << "client ip: " << clientIp;
BOOST_LOG_TRIVIAL(debug) << "Performing websocket handshake";
BOOST_LOG_TRIVIAL(trace) << "Performing websocket handshake";
// Perform the websocket handshake
ws->async_handshake(ip_, "/", yield[ec]);
if (ec)
return {};
BOOST_LOG_TRIVIAL(debug) << "Sending request";
BOOST_LOG_TRIVIAL(trace) << "Sending request";
// Send the message
ws->async_write(
net::buffer(boost::json::serialize(request)), yield[ec]);
@@ -1106,7 +1178,7 @@ ETLSourceImpl<Derived>::forwardToRippled(
<< "Error parsing response: " << std::string{begin, end};
return {};
}
BOOST_LOG_TRIVIAL(debug) << "Successfully forward request";
BOOST_LOG_TRIVIAL(trace) << "Successfully forward request";
response = parsed.as_object();

View File

@@ -15,6 +15,7 @@
#include <grpcpp/grpcpp.h>
class ETLLoadBalancer;
class ETLSource;
class SubscriptionManager;
/// This class manages a connection to a single ETL source. This is almost
@@ -24,6 +25,64 @@ class SubscriptionManager;
/// has. This class also has methods for extracting said ledgers. Lastly this
/// class forwards transactions received on the transactions_proposed streams to
/// any subscribers.
class ForwardCache
{
using response_type = std::optional<boost::json::object>;
mutable std::atomic_bool stopping_ = false;
mutable std::shared_mutex mtx_;
std::unordered_map<std::string, response_type> latestForwarded_;
boost::asio::io_context::strand strand_;
boost::asio::steady_timer timer_;
ETLSource const& source_;
std::uint32_t duration_ = 10;
void
clear();
public:
ForwardCache(
boost::json::object const& config,
boost::asio::io_context& ioc,
ETLSource const& source)
: strand_(ioc), timer_(strand_), source_(source)
{
if (config.contains("cache") && !config.at("cache").is_array())
throw std::runtime_error("ETLSource cache must be array");
if (config.contains("cache_duration") &&
!config.at("cache_duration").is_int64())
throw std::runtime_error(
"ETLSource cache_duration must be a number");
duration_ = config.contains("cache_duration")
? config.at("cache_duration").as_int64()
: 10;
auto commands = config.contains("cache") ? config.at("cache").as_array()
: boost::json::array{};
for (auto const& command : commands)
{
if (!command.is_string())
throw std::runtime_error(
"ETLSource forward command must be array of strings");
latestForwarded_[command.as_string().c_str()] = {};
}
}
// This is to be called every freshenDuration_ seconds.
// It will request information from this etlSource, and
// will populate the cache with the latest value. If the
// request fails, it will evict that value from the cache.
void
freshen();
std::optional<boost::json::object>
get(boost::json::object const& command) const;
};
class ETLSource
{
@@ -64,6 +123,15 @@ public:
virtual ~ETLSource()
{
}
private:
friend ForwardCache;
virtual std::optional<boost::json::object>
requestFromRippled(
boost::json::object const& request,
std::string const& clientIp,
boost::asio::yield_context& yield) const = 0;
};
template <class Derived>
@@ -105,6 +173,14 @@ class ETLSourceImpl : public ETLSource
std::shared_ptr<SubscriptionManager> subscriptions_;
ETLLoadBalancer& balancer_;
ForwardCache forwardCache_;
std::optional<boost::json::object>
requestFromRippled(
boost::json::object const& request,
std::string const& clientIp,
boost::asio::yield_context& yield) const override;
protected:
Derived&
derived()

View File

@@ -49,7 +49,7 @@ doServerInfo(Context const& context)
context.subscriptions->report();
auto serverInfoRippled = context.balancer->forwardToRippled(
{{"counters", "server_info"}}, context.clientIp, context.yield);
{{"command", "server_info"}}, context.clientIp, context.yield);
info[JS(load_factor)] = 1;
info["clio_version"] = Build::getClioVersionString();