mirror of
https://github.com/XRPLF/clio.git
synced 2026-04-29 15:37:53 +00:00
Forward client IP to rippled when proxying (#77)
This commit is contained in:
@@ -11,7 +11,9 @@ endif()
|
||||
add_library(clio)
|
||||
target_include_directories(clio PUBLIC src)
|
||||
|
||||
|
||||
include(FetchContent)
|
||||
include(ExternalProject)
|
||||
#include(CMake/deps/) # TODO: see if this works...
|
||||
include(CMake/settings.cmake)
|
||||
include(CMake/deps/rippled.cmake)
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
#include <iostream>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
|
||||
namespace Backend {
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
#include <boost/asio.hpp>
|
||||
#include <boost/format.hpp>
|
||||
#include <backend/PostgresBackend.h>
|
||||
#include <thread>
|
||||
namespace Backend {
|
||||
|
||||
PostgresBackend::PostgresBackend(boost::json::object const& config)
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
#include <backend/DBHelpers.h>
|
||||
#include <etl/ETLSource.h>
|
||||
#include <etl/ReportingETL.h>
|
||||
#include <thread>
|
||||
|
||||
// Create ETL source without grpc endpoint
|
||||
// Fetch ledger and load initial ledger will fail for this source
|
||||
@@ -921,37 +922,15 @@ ETLLoadBalancer::fetchLedger(
|
||||
return {};
|
||||
}
|
||||
|
||||
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>
|
||||
ETLLoadBalancer::getRippledForwardingStub() const
|
||||
{
|
||||
if (sources_.size() == 0)
|
||||
return nullptr;
|
||||
srand((unsigned)time(0));
|
||||
auto sourceIdx = rand() % sources_.size();
|
||||
auto numAttempts = 0;
|
||||
while (numAttempts < sources_.size())
|
||||
{
|
||||
auto stub = sources_[sourceIdx]->getRippledForwardingStub();
|
||||
if (!stub)
|
||||
{
|
||||
sourceIdx = (sourceIdx + 1) % sources_.size();
|
||||
++numAttempts;
|
||||
continue;
|
||||
}
|
||||
return stub;
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
std::optional<boost::json::object>
|
||||
ETLLoadBalancer::forwardToRippled(boost::json::object const& request) const
|
||||
ETLLoadBalancer::forwardToRippled(boost::json::object const& request, std::string const& clientIp) const
|
||||
{
|
||||
srand((unsigned)time(0));
|
||||
auto sourceIdx = rand() % sources_.size();
|
||||
auto numAttempts = 0;
|
||||
while (numAttempts < sources_.size())
|
||||
{
|
||||
if (auto res = sources_[sourceIdx]->forwardToRippled(request))
|
||||
if (auto res = sources_[sourceIdx]->forwardToRippled(request, clientIp))
|
||||
return res;
|
||||
|
||||
sourceIdx = (sourceIdx + 1) % sources_.size();
|
||||
@@ -960,32 +939,10 @@ ETLLoadBalancer::forwardToRippled(boost::json::object const& request) const
|
||||
return {};
|
||||
}
|
||||
|
||||
template <class Derived>
|
||||
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>
|
||||
ETLSourceImpl<Derived>::getRippledForwardingStub() const
|
||||
{
|
||||
if (!connected_)
|
||||
return nullptr;
|
||||
try
|
||||
{
|
||||
return org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub(
|
||||
grpc::CreateChannel(
|
||||
beast::IP::Endpoint(
|
||||
boost::asio::ip::make_address(ip_), std::stoi(grpcPort_))
|
||||
.to_string(),
|
||||
grpc::InsecureChannelCredentials()));
|
||||
}
|
||||
catch (std::exception const&)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(error) << "Failed to create grpc stub";
|
||||
return nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
template <class Derived>
|
||||
std::optional<boost::json::object>
|
||||
ETLSourceImpl<Derived>::forwardToRippled(
|
||||
boost::json::object const& request) const
|
||||
boost::json::object const& request, std::string const& clientIp) const
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(debug) << "Attempting to forward request to tx. "
|
||||
<< "request = " << boost::json::serialize(request);
|
||||
@@ -1026,17 +983,17 @@ ETLSourceImpl<Derived>::forwardToRippled(
|
||||
//
|
||||
// https://github.com/ripple/rippled/blob/develop/cfg/rippled-example.cfg
|
||||
ws->set_option(websocket::stream_base::decorator(
|
||||
[&request](websocket::request_type& req) {
|
||||
[&request,&clientIp] (websocket::request_type& req) {
|
||||
req.set(
|
||||
http::field::user_agent,
|
||||
std::string(BOOST_BEAST_VERSION_STRING) +
|
||||
" websocket-client-coro");
|
||||
req.set(
|
||||
http::field::forwarded,
|
||||
"for=" + boost::json::serialize(request));
|
||||
"for=" + clientIp);
|
||||
}));
|
||||
BOOST_LOG_TRIVIAL(debug)
|
||||
<< "client ip: " << boost::json::serialize(request);
|
||||
<< "client ip: " << clientIp;
|
||||
|
||||
BOOST_LOG_TRIVIAL(debug) << "Performing websocket handshake";
|
||||
// Perform the websocket handshake
|
||||
|
||||
@@ -55,11 +55,8 @@ public:
|
||||
std::uint32_t numMarkers,
|
||||
bool cacheOnly = false) = 0;
|
||||
|
||||
virtual std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>
|
||||
getRippledForwardingStub() const = 0;
|
||||
|
||||
virtual std::optional<boost::json::object>
|
||||
forwardToRippled(boost::json::object const& request) const = 0;
|
||||
forwardToRippled(boost::json::object const& request, std::string const& clientIp) const = 0;
|
||||
|
||||
virtual ~ETLSource()
|
||||
{
|
||||
@@ -325,13 +322,8 @@ public:
|
||||
bool
|
||||
handleMessage();
|
||||
|
||||
/// Get grpc stub to forward requests to rippled node
|
||||
/// @return stub to send requests to ETL source
|
||||
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>
|
||||
getRippledForwardingStub() const override;
|
||||
|
||||
std::optional<boost::json::object>
|
||||
forwardToRippled(boost::json::object const& request) const override;
|
||||
forwardToRippled(boost::json::object const& request, std::string const& clientIp) const override;
|
||||
};
|
||||
|
||||
class PlainETLSource : public ETLSourceImpl<PlainETLSource>
|
||||
@@ -560,16 +552,11 @@ public:
|
||||
return ret;
|
||||
}
|
||||
|
||||
/// Randomly select a rippled node to forward a gRPC request to
|
||||
/// @return gRPC stub to forward requests to rippled node
|
||||
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>
|
||||
getRippledForwardingStub() const;
|
||||
|
||||
/// Forward a JSON RPC request to a randomly selected rippled node
|
||||
/// @param request JSON-RPC request
|
||||
/// @return response received from rippled node
|
||||
std::optional<boost::json::object>
|
||||
forwardToRippled(boost::json::object const& request) const;
|
||||
forwardToRippled(boost::json::object const& request, std::string const& clientIp) const;
|
||||
|
||||
private:
|
||||
/// f is a function that takes an ETLSource as an argument and returns a
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
#include <string>
|
||||
#include <subscriptions/SubscriptionManager.h>
|
||||
#include <variant>
|
||||
#include <thread>
|
||||
|
||||
namespace detail {
|
||||
/// Convenience function for printing out basic ledger info
|
||||
|
||||
@@ -11,7 +11,8 @@ make_WsContext(
|
||||
std::shared_ptr<ETLLoadBalancer> const& balancer,
|
||||
std::shared_ptr<WsBase> const& session,
|
||||
Backend::LedgerRange const& range,
|
||||
Counters& counters)
|
||||
Counters& counters,
|
||||
std::string const& clientIp)
|
||||
{
|
||||
if (!request.contains("command"))
|
||||
return {};
|
||||
@@ -27,7 +28,8 @@ make_WsContext(
|
||||
balancer,
|
||||
session,
|
||||
range,
|
||||
counters};
|
||||
counters,
|
||||
clientIp};
|
||||
}
|
||||
|
||||
std::optional<Context>
|
||||
@@ -37,7 +39,8 @@ make_HttpContext(
|
||||
std::shared_ptr<SubscriptionManager> const& subscriptions,
|
||||
std::shared_ptr<ETLLoadBalancer> const& balancer,
|
||||
Backend::LedgerRange const& range,
|
||||
RPC::Counters& counters)
|
||||
RPC::Counters& counters,
|
||||
std::string const& clientIp)
|
||||
{
|
||||
if (!request.contains("method") || !request.at("method").is_string())
|
||||
return {};
|
||||
@@ -67,7 +70,8 @@ make_HttpContext(
|
||||
balancer,
|
||||
nullptr,
|
||||
range,
|
||||
counters};
|
||||
counters,
|
||||
clientIp};
|
||||
}
|
||||
|
||||
boost::json::object
|
||||
@@ -162,7 +166,7 @@ buildResponse(Context const& ctx)
|
||||
{
|
||||
if (shouldForwardToRippled(ctx))
|
||||
{
|
||||
auto res = ctx.balancer->forwardToRippled(ctx.params);
|
||||
auto res = ctx.balancer->forwardToRippled(ctx.params, ctx.clientIp);
|
||||
|
||||
ctx.counters.rpcForwarded(ctx.method);
|
||||
|
||||
|
||||
@@ -39,6 +39,7 @@ struct Context
|
||||
std::shared_ptr<WsBase> session;
|
||||
Backend::LedgerRange const& range;
|
||||
Counters& counters;
|
||||
std::string clientIp;
|
||||
|
||||
Context(
|
||||
std::string const& command_,
|
||||
@@ -49,7 +50,8 @@ struct Context
|
||||
std::shared_ptr<ETLLoadBalancer> const& balancer_,
|
||||
std::shared_ptr<WsBase> const& session_,
|
||||
Backend::LedgerRange const& range_,
|
||||
Counters& counters_)
|
||||
Counters& counters_,
|
||||
std::string const& clientIp_)
|
||||
: method(command_)
|
||||
, version(version_)
|
||||
, params(params_)
|
||||
@@ -59,6 +61,7 @@ struct Context
|
||||
, session(session_)
|
||||
, range(range_)
|
||||
, counters(counters_)
|
||||
, clientIp(clientIp_)
|
||||
{
|
||||
}
|
||||
};
|
||||
@@ -138,7 +141,8 @@ make_WsContext(
|
||||
std::shared_ptr<ETLLoadBalancer> const& balancer,
|
||||
std::shared_ptr<WsBase> const& session,
|
||||
Backend::LedgerRange const& range,
|
||||
Counters& counters);
|
||||
Counters& counters,
|
||||
std::string const& clientIp);
|
||||
|
||||
std::optional<Context>
|
||||
make_HttpContext(
|
||||
@@ -147,7 +151,8 @@ make_HttpContext(
|
||||
std::shared_ptr<SubscriptionManager> const& subscriptions,
|
||||
std::shared_ptr<ETLLoadBalancer> const& balancer,
|
||||
Backend::LedgerRange const& range,
|
||||
Counters& counters);
|
||||
Counters& counters,
|
||||
std::string const& clientIp);
|
||||
|
||||
Result
|
||||
buildResponse(Context const& ctx);
|
||||
|
||||
@@ -31,7 +31,7 @@ doServerInfo(Context const& context)
|
||||
info["counters"].as_object()["rpc"] = context.counters.report();
|
||||
}
|
||||
|
||||
auto serverInfoRippled = context.balancer->forwardToRippled(context.params);
|
||||
auto serverInfoRippled = context.balancer->forwardToRippled(context.params, context.clientIp);
|
||||
if (serverInfoRippled && !serverInfoRippled->contains("error"))
|
||||
response["info"].as_object()["load_factor"] = 1;
|
||||
|
||||
|
||||
@@ -126,13 +126,6 @@ handle_request(
|
||||
RPC::make_error(RPC::Error::rpcBAD_SYNTAX))));
|
||||
}
|
||||
|
||||
if (!dosGuard.isOk(ip))
|
||||
return send(httpResponse(
|
||||
http::status::ok,
|
||||
"application/json",
|
||||
boost::json::serialize(
|
||||
RPC::make_error(RPC::Error::rpcSLOW_DOWN))));
|
||||
|
||||
auto range = backend->fetchLedgerRange();
|
||||
if (!range)
|
||||
return send(httpResponse(
|
||||
@@ -142,7 +135,7 @@ handle_request(
|
||||
RPC::make_error(RPC::Error::rpcNOT_READY))));
|
||||
|
||||
std::optional<RPC::Context> context =
|
||||
RPC::make_HttpContext(request, backend, nullptr, balancer, *range, counters);
|
||||
RPC::make_HttpContext(request, backend, nullptr, balancer, *range, counters, ip);
|
||||
|
||||
if (!context)
|
||||
return send(httpResponse(
|
||||
@@ -183,8 +176,7 @@ handle_request(
|
||||
responseStr = boost::json::serialize(response);
|
||||
}
|
||||
|
||||
if (!dosGuard.add(ip, responseStr.size()))
|
||||
result["warning"] = "Too many requests";
|
||||
dosGuard.add(ip, responseStr.size());
|
||||
|
||||
return send(
|
||||
httpResponse(http::status::ok, "application/json", responseStr));
|
||||
|
||||
@@ -243,7 +243,8 @@ public:
|
||||
balancer_,
|
||||
shared_from_this(),
|
||||
*range,
|
||||
counters_);
|
||||
counters_,
|
||||
ip);
|
||||
|
||||
if (!context)
|
||||
return sendError(RPC::Error::rpcBAD_SYNTAX);
|
||||
|
||||
Reference in New Issue
Block a user