From 166ff63dbcec7e1ad58335662d96c2d9674e54a9 Mon Sep 17 00:00:00 2001 From: Nathan Nichols Date: Wed, 15 Jun 2022 16:18:25 -0500 Subject: [PATCH] cache commands that dont take parameters (#153) * Adds a forwardCache to each ETLSource which allows operators to specify which commands (that don't require parameters) they want to cache. --- src/etl/ETLSource.cpp | 102 +++++++++++++++++++++++++++----- src/etl/ETLSource.h | 76 ++++++++++++++++++++++++ src/rpc/handlers/ServerInfo.cpp | 2 +- 3 files changed, 164 insertions(+), 16 deletions(-) diff --git a/src/etl/ETLSource.cpp b/src/etl/ETLSource.cpp index 51067fb70..a4f47cf31 100644 --- a/src/etl/ETLSource.cpp +++ b/src/etl/ETLSource.cpp @@ -11,6 +11,64 @@ #include #include +void +ForwardCache::freshen() +{ + BOOST_LOG_TRIVIAL(trace) << "Freshening ForwardCache"; + + auto numOutstanding = + std::make_shared(latestForwarded_.size()); + + for (auto const& cacheEntry : latestForwarded_) + { + boost::asio::spawn( + strand_, + [this, numOutstanding, command = cacheEntry.first]( + boost::asio::yield_context yield) { + boost::json::object request = {{"command", command}}; + auto resp = source_.requestFromRippled(request, {}, yield); + + if (!resp || resp->contains("error")) + resp = {}; + + { + std::unique_lock lk(mtx_); + latestForwarded_[command] = resp; + } + }); + } +} + +void +ForwardCache::clear() +{ + std::unique_lock lk(mtx_); + for (auto& cacheEntry : latestForwarded_) + latestForwarded_[cacheEntry.first] = {}; +} + +std::optional +ForwardCache::get(boost::json::object const& request) const +{ + std::optional command = {}; + if (request.contains("command") && !request.contains("method") && + request.at("command").is_string()) + command = request.at("command").as_string().c_str(); + else if ( + request.contains("method") && !request.contains("command") && + request.at("method").is_string()) + command = request.at("method").as_string().c_str(); + + if (!command) + return {}; + + std::shared_lock lk(mtx_); + if (!latestForwarded_.contains(*command)) + return {}; + + return {latestForwarded_.at(*command)}; +} + // Create ETL source without grpc endpoint // Fetch ledger and load initial ledger will fail for this source // Primarly used in read-only mode, to monitor when ledgers are validated @@ -27,6 +85,7 @@ ETLSourceImpl::ETLSourceImpl( , backend_(backend) , subscriptions_(subscriptions) , balancer_(balancer) + , forwardCache_(config, ioContext, *this) , ioc_(ioContext) , timer_(ioContext) { @@ -245,11 +304,9 @@ PlainETLSource::onConnect( boost::beast::websocket::stream_base::decorator( [](boost::beast::websocket::request_type& req) { req.set( - boost::beast::http::field::user_agent, - std::string(BOOST_BEAST_VERSION_STRING) + - " clio-client"); + boost::beast::http::field::user_agent, "clio-client"); - req.set("X-User", "coro-client"); + req.set("X-User", "clio-client"); })); // Update the host_ string. This will provide the value of the @@ -291,11 +348,9 @@ SslETLSource::onConnect( boost::beast::websocket::stream_base::decorator( [](boost::beast::websocket::request_type& req) { req.set( - boost::beast::http::field::user_agent, - std::string(BOOST_BEAST_VERSION_STRING) + - " clio-client"); + boost::beast::http::field::user_agent, "clio-client"); - req.set("X-User", "coro-client"); + req.set("X-User", "clio-client"); })); // Update the host_ string. This will provide the value of the @@ -475,6 +530,7 @@ ETLSourceImpl::handleMessage() { if (response.contains("transaction")) { + forwardCache_.freshen(); subscriptions_->forwardProposedTransaction(response); } else if ( @@ -1026,7 +1082,23 @@ ETLSourceImpl::forwardToRippled( std::string const& clientIp, boost::asio::yield_context& yield) const { - BOOST_LOG_TRIVIAL(debug) << "Attempting to forward request to tx. " + if (auto resp = forwardCache_.get(request); resp) + { + BOOST_LOG_TRIVIAL(debug) << "request hit forwardCache"; + return resp; + } + + return requestFromRippled(request, clientIp, yield); +} + +template +std::optional +ETLSourceImpl::requestFromRippled( + boost::json::object const& request, + std::string const& clientIp, + boost::asio::yield_context& yield) const +{ + BOOST_LOG_TRIVIAL(trace) << "Attempting to forward request to tx. " << "request = " << boost::json::serialize(request); boost::json::object response; @@ -1047,7 +1119,7 @@ ETLSourceImpl::forwardToRippled( // These objects perform our I/O tcp::resolver resolver{ioc_}; - BOOST_LOG_TRIVIAL(debug) << "Creating websocket"; + BOOST_LOG_TRIVIAL(trace) << "Creating websocket"; auto ws = std::make_unique>(ioc_); // Look up the domain name @@ -1057,7 +1129,7 @@ ETLSourceImpl::forwardToRippled( ws->next_layer().expires_after(std::chrono::seconds(3)); - BOOST_LOG_TRIVIAL(debug) << "Connecting websocket"; + BOOST_LOG_TRIVIAL(trace) << "Connecting websocket"; // Make the connection on the IP address we get from a lookup ws->next_layer().async_connect(results, yield[ec]); if (ec) @@ -1076,15 +1148,15 @@ ETLSourceImpl::forwardToRippled( " websocket-client-coro"); req.set(http::field::forwarded, "for=" + clientIp); })); - BOOST_LOG_TRIVIAL(debug) << "client ip: " << clientIp; + BOOST_LOG_TRIVIAL(trace) << "client ip: " << clientIp; - BOOST_LOG_TRIVIAL(debug) << "Performing websocket handshake"; + BOOST_LOG_TRIVIAL(trace) << "Performing websocket handshake"; // Perform the websocket handshake ws->async_handshake(ip_, "/", yield[ec]); if (ec) return {}; - BOOST_LOG_TRIVIAL(debug) << "Sending request"; + BOOST_LOG_TRIVIAL(trace) << "Sending request"; // Send the message ws->async_write( net::buffer(boost::json::serialize(request)), yield[ec]); @@ -1106,7 +1178,7 @@ ETLSourceImpl::forwardToRippled( << "Error parsing response: " << std::string{begin, end}; return {}; } - BOOST_LOG_TRIVIAL(debug) << "Successfully forward request"; + BOOST_LOG_TRIVIAL(trace) << "Successfully forward request"; response = parsed.as_object(); diff --git a/src/etl/ETLSource.h b/src/etl/ETLSource.h index 34dba113d..1e9d017eb 100644 --- a/src/etl/ETLSource.h +++ b/src/etl/ETLSource.h @@ -15,6 +15,7 @@ #include class ETLLoadBalancer; +class ETLSource; class SubscriptionManager; /// This class manages a connection to a single ETL source. This is almost @@ -24,6 +25,64 @@ class SubscriptionManager; /// has. This class also has methods for extracting said ledgers. Lastly this /// class forwards transactions received on the transactions_proposed streams to /// any subscribers. +class ForwardCache +{ + using response_type = std::optional; + + mutable std::atomic_bool stopping_ = false; + mutable std::shared_mutex mtx_; + std::unordered_map latestForwarded_; + + boost::asio::io_context::strand strand_; + boost::asio::steady_timer timer_; + ETLSource const& source_; + std::uint32_t duration_ = 10; + + void + clear(); + +public: + ForwardCache( + boost::json::object const& config, + boost::asio::io_context& ioc, + ETLSource const& source) + : strand_(ioc), timer_(strand_), source_(source) + { + if (config.contains("cache") && !config.at("cache").is_array()) + throw std::runtime_error("ETLSource cache must be array"); + + if (config.contains("cache_duration") && + !config.at("cache_duration").is_int64()) + throw std::runtime_error( + "ETLSource cache_duration must be a number"); + + duration_ = config.contains("cache_duration") + ? config.at("cache_duration").as_int64() + : 10; + + auto commands = config.contains("cache") ? config.at("cache").as_array() + : boost::json::array{}; + + for (auto const& command : commands) + { + if (!command.is_string()) + throw std::runtime_error( + "ETLSource forward command must be array of strings"); + + latestForwarded_[command.as_string().c_str()] = {}; + } + } + + // This is to be called every freshenDuration_ seconds. + // It will request information from this etlSource, and + // will populate the cache with the latest value. If the + // request fails, it will evict that value from the cache. + void + freshen(); + + std::optional + get(boost::json::object const& command) const; +}; class ETLSource { @@ -64,6 +123,15 @@ public: virtual ~ETLSource() { } + +private: + friend ForwardCache; + + virtual std::optional + requestFromRippled( + boost::json::object const& request, + std::string const& clientIp, + boost::asio::yield_context& yield) const = 0; }; template @@ -105,6 +173,14 @@ class ETLSourceImpl : public ETLSource std::shared_ptr subscriptions_; ETLLoadBalancer& balancer_; + ForwardCache forwardCache_; + + std::optional + requestFromRippled( + boost::json::object const& request, + std::string const& clientIp, + boost::asio::yield_context& yield) const override; + protected: Derived& derived() diff --git a/src/rpc/handlers/ServerInfo.cpp b/src/rpc/handlers/ServerInfo.cpp index 6d07588a6..df2c9de5d 100644 --- a/src/rpc/handlers/ServerInfo.cpp +++ b/src/rpc/handlers/ServerInfo.cpp @@ -49,7 +49,7 @@ doServerInfo(Context const& context) context.subscriptions->report(); auto serverInfoRippled = context.balancer->forwardToRippled( - {{"counters", "server_info"}}, context.clientIp, context.yield); + {{"command", "server_info"}}, context.clientIp, context.yield); info[JS(load_factor)] = 1; info["clio_version"] = Build::getClioVersionString();