diff --git a/CMakeLists.txt b/CMakeLists.txt index 6b619491..6abfbeda 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -11,7 +11,9 @@ endif() add_library(clio) target_include_directories(clio PUBLIC src) + include(FetchContent) +include(ExternalProject) #include(CMake/deps/) # TODO: see if this works... include(CMake/settings.cmake) include(CMake/deps/rippled.cmake) diff --git a/src/backend/CassandraBackend.h b/src/backend/CassandraBackend.h index b85d25e5..74904070 100644 --- a/src/backend/CassandraBackend.h +++ b/src/backend/CassandraBackend.h @@ -14,6 +14,7 @@ #include #include #include +#include namespace Backend { diff --git a/src/backend/PostgresBackend.cpp b/src/backend/PostgresBackend.cpp index 245aebb1..c69e186e 100644 --- a/src/backend/PostgresBackend.cpp +++ b/src/backend/PostgresBackend.cpp @@ -1,6 +1,7 @@ #include #include #include +#include namespace Backend { PostgresBackend::PostgresBackend(boost::json::object const& config) diff --git a/src/etl/ETLSource.cpp b/src/etl/ETLSource.cpp index c904ad4e..0c6c1c3c 100644 --- a/src/etl/ETLSource.cpp +++ b/src/etl/ETLSource.cpp @@ -9,6 +9,7 @@ #include #include #include +#include // Create ETL source without grpc endpoint // Fetch ledger and load initial ledger will fail for this source @@ -921,37 +922,15 @@ ETLLoadBalancer::fetchLedger( return {}; } -std::unique_ptr -ETLLoadBalancer::getRippledForwardingStub() const -{ - if (sources_.size() == 0) - return nullptr; - srand((unsigned)time(0)); - auto sourceIdx = rand() % sources_.size(); - auto numAttempts = 0; - while (numAttempts < sources_.size()) - { - auto stub = sources_[sourceIdx]->getRippledForwardingStub(); - if (!stub) - { - sourceIdx = (sourceIdx + 1) % sources_.size(); - ++numAttempts; - continue; - } - return stub; - } - return nullptr; -} - std::optional -ETLLoadBalancer::forwardToRippled(boost::json::object const& request) const +ETLLoadBalancer::forwardToRippled(boost::json::object const& request, std::string const& clientIp) const { srand((unsigned)time(0)); auto sourceIdx = rand() % sources_.size(); auto numAttempts = 0; while (numAttempts < sources_.size()) { - if (auto res = sources_[sourceIdx]->forwardToRippled(request)) + if (auto res = sources_[sourceIdx]->forwardToRippled(request, clientIp)) return res; sourceIdx = (sourceIdx + 1) % sources_.size(); @@ -960,32 +939,10 @@ ETLLoadBalancer::forwardToRippled(boost::json::object const& request) const return {}; } -template -std::unique_ptr -ETLSourceImpl::getRippledForwardingStub() const -{ - if (!connected_) - return nullptr; - try - { - return org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub( - grpc::CreateChannel( - beast::IP::Endpoint( - boost::asio::ip::make_address(ip_), std::stoi(grpcPort_)) - .to_string(), - grpc::InsecureChannelCredentials())); - } - catch (std::exception const&) - { - BOOST_LOG_TRIVIAL(error) << "Failed to create grpc stub"; - return nullptr; - } -} - template std::optional ETLSourceImpl::forwardToRippled( - boost::json::object const& request) const + boost::json::object const& request, std::string const& clientIp) const { BOOST_LOG_TRIVIAL(debug) << "Attempting to forward request to tx. " << "request = " << boost::json::serialize(request); @@ -1026,17 +983,17 @@ ETLSourceImpl::forwardToRippled( // // https://github.com/ripple/rippled/blob/develop/cfg/rippled-example.cfg ws->set_option(websocket::stream_base::decorator( - [&request](websocket::request_type& req) { + [&request,&clientIp] (websocket::request_type& req) { req.set( http::field::user_agent, std::string(BOOST_BEAST_VERSION_STRING) + " websocket-client-coro"); req.set( http::field::forwarded, - "for=" + boost::json::serialize(request)); + "for=" + clientIp); })); BOOST_LOG_TRIVIAL(debug) - << "client ip: " << boost::json::serialize(request); + << "client ip: " << clientIp; BOOST_LOG_TRIVIAL(debug) << "Performing websocket handshake"; // Perform the websocket handshake diff --git a/src/etl/ETLSource.h b/src/etl/ETLSource.h index 3c967ead..6a998894 100644 --- a/src/etl/ETLSource.h +++ b/src/etl/ETLSource.h @@ -55,11 +55,8 @@ public: std::uint32_t numMarkers, bool cacheOnly = false) = 0; - virtual std::unique_ptr - getRippledForwardingStub() const = 0; - virtual std::optional - forwardToRippled(boost::json::object const& request) const = 0; + forwardToRippled(boost::json::object const& request, std::string const& clientIp) const = 0; virtual ~ETLSource() { @@ -325,13 +322,8 @@ public: bool handleMessage(); - /// Get grpc stub to forward requests to rippled node - /// @return stub to send requests to ETL source - std::unique_ptr - getRippledForwardingStub() const override; - std::optional - forwardToRippled(boost::json::object const& request) const override; + forwardToRippled(boost::json::object const& request, std::string const& clientIp) const override; }; class PlainETLSource : public ETLSourceImpl @@ -560,16 +552,11 @@ public: return ret; } - /// Randomly select a rippled node to forward a gRPC request to - /// @return gRPC stub to forward requests to rippled node - std::unique_ptr - getRippledForwardingStub() const; - /// Forward a JSON RPC request to a randomly selected rippled node /// @param request JSON-RPC request /// @return response received from rippled node std::optional - forwardToRippled(boost::json::object const& request) const; + forwardToRippled(boost::json::object const& request, std::string const& clientIp) const; private: /// f is a function that takes an ETLSource as an argument and returns a diff --git a/src/etl/ReportingETL.cpp b/src/etl/ReportingETL.cpp index 5ea83333..34264dcd 100644 --- a/src/etl/ReportingETL.cpp +++ b/src/etl/ReportingETL.cpp @@ -12,6 +12,7 @@ #include #include #include +#include namespace detail { /// Convenience function for printing out basic ledger info diff --git a/src/rpc/RPC.cpp b/src/rpc/RPC.cpp index 89d6f2a8..17e723ce 100644 --- a/src/rpc/RPC.cpp +++ b/src/rpc/RPC.cpp @@ -11,7 +11,8 @@ make_WsContext( std::shared_ptr const& balancer, std::shared_ptr const& session, Backend::LedgerRange const& range, - Counters& counters) + Counters& counters, + std::string const& clientIp) { if (!request.contains("command")) return {}; @@ -27,7 +28,8 @@ make_WsContext( balancer, session, range, - counters}; + counters, + clientIp}; } std::optional @@ -37,7 +39,8 @@ make_HttpContext( std::shared_ptr const& subscriptions, std::shared_ptr const& balancer, Backend::LedgerRange const& range, - RPC::Counters& counters) + RPC::Counters& counters, + std::string const& clientIp) { if (!request.contains("method") || !request.at("method").is_string()) return {}; @@ -67,7 +70,8 @@ make_HttpContext( balancer, nullptr, range, - counters}; + counters, + clientIp}; } boost::json::object @@ -162,7 +166,7 @@ buildResponse(Context const& ctx) { if (shouldForwardToRippled(ctx)) { - auto res = ctx.balancer->forwardToRippled(ctx.params); + auto res = ctx.balancer->forwardToRippled(ctx.params, ctx.clientIp); ctx.counters.rpcForwarded(ctx.method); diff --git a/src/rpc/RPC.h b/src/rpc/RPC.h index 4ce270d7..dbd5da88 100644 --- a/src/rpc/RPC.h +++ b/src/rpc/RPC.h @@ -39,6 +39,7 @@ struct Context std::shared_ptr session; Backend::LedgerRange const& range; Counters& counters; + std::string clientIp; Context( std::string const& command_, @@ -49,7 +50,8 @@ struct Context std::shared_ptr const& balancer_, std::shared_ptr const& session_, Backend::LedgerRange const& range_, - Counters& counters_) + Counters& counters_, + std::string const& clientIp_) : method(command_) , version(version_) , params(params_) @@ -59,6 +61,7 @@ struct Context , session(session_) , range(range_) , counters(counters_) + , clientIp(clientIp_) { } }; @@ -138,7 +141,8 @@ make_WsContext( std::shared_ptr const& balancer, std::shared_ptr const& session, Backend::LedgerRange const& range, - Counters& counters); + Counters& counters, + std::string const& clientIp); std::optional make_HttpContext( @@ -147,7 +151,8 @@ make_HttpContext( std::shared_ptr const& subscriptions, std::shared_ptr const& balancer, Backend::LedgerRange const& range, - Counters& counters); + Counters& counters, + std::string const& clientIp); Result buildResponse(Context const& ctx); diff --git a/src/rpc/handlers/ServerInfo.cpp b/src/rpc/handlers/ServerInfo.cpp index eb97eaf8..b698a87a 100644 --- a/src/rpc/handlers/ServerInfo.cpp +++ b/src/rpc/handlers/ServerInfo.cpp @@ -31,7 +31,7 @@ doServerInfo(Context const& context) info["counters"].as_object()["rpc"] = context.counters.report(); } - auto serverInfoRippled = context.balancer->forwardToRippled(context.params); + auto serverInfoRippled = context.balancer->forwardToRippled(context.params, context.clientIp); if (serverInfoRippled && !serverInfoRippled->contains("error")) response["info"].as_object()["load_factor"] = 1; diff --git a/src/webserver/HttpBase.h b/src/webserver/HttpBase.h index 5a0a24fa..c18d29c7 100644 --- a/src/webserver/HttpBase.h +++ b/src/webserver/HttpBase.h @@ -126,13 +126,6 @@ handle_request( RPC::make_error(RPC::Error::rpcBAD_SYNTAX)))); } - if (!dosGuard.isOk(ip)) - return send(httpResponse( - http::status::ok, - "application/json", - boost::json::serialize( - RPC::make_error(RPC::Error::rpcSLOW_DOWN)))); - auto range = backend->fetchLedgerRange(); if (!range) return send(httpResponse( @@ -142,7 +135,7 @@ handle_request( RPC::make_error(RPC::Error::rpcNOT_READY)))); std::optional context = - RPC::make_HttpContext(request, backend, nullptr, balancer, *range, counters); + RPC::make_HttpContext(request, backend, nullptr, balancer, *range, counters, ip); if (!context) return send(httpResponse( @@ -183,8 +176,7 @@ handle_request( responseStr = boost::json::serialize(response); } - if (!dosGuard.add(ip, responseStr.size())) - result["warning"] = "Too many requests"; + dosGuard.add(ip, responseStr.size()); return send( httpResponse(http::status::ok, "application/json", responseStr)); diff --git a/src/webserver/WsBase.h b/src/webserver/WsBase.h index e7c04c9c..ec6ed269 100644 --- a/src/webserver/WsBase.h +++ b/src/webserver/WsBase.h @@ -243,7 +243,8 @@ public: balancer_, shared_from_this(), *range, - counters_); + counters_, + ip); if (!context) return sendError(RPC::Error::rpcBAD_SYNTAX);