20 #include <ripple/app/main/GRPCServer.h>
21 #include <ripple/app/reporting/P2pProxy.h>
22 #include <ripple/beast/core/CurrentThreadName.h>
23 #include <ripple/resource/Fees.h>
25 #include <ripple/beast/net/IPAddressConversion.h>
26 #include <ripple/core/ConfigSections.h>
44 peerClean = peer.
substr(first + 1);
60 template <
class Request,
class Response>
62 org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService& service,
63 grpc::ServerCompletionQueue& cq,
76 , bindListener_(
std::move(bindListener))
77 , handler_(
std::move(handler))
78 , forward_(
std::move(forward))
79 , requiredCondition_(
std::move(requiredCondition))
80 , loadType_(
std::move(loadType))
81 , secureGatewayIPs_(secureGatewayIPs)
88 template <
class Request,
class Response>
92 return std::make_shared<CallData<Request, Response>>(
104 template <
class Request,
class Response>
109 BOOST_ASSERT(!finished_);
112 this->shared_from_this();
127 thisShared->process(coro);
134 grpc::StatusCode::INTERNAL,
"Job Queue is already stopped"};
135 responder_.FinishWithError(status,
this);
139 template <
class Request,
class Response>
146 auto usage = getUsage();
151 grpc::StatusCode::RESOURCE_EXHAUSTED,
152 "usage balance exceeds threshold"};
153 responder_.FinishWithError(status,
this);
157 auto loadType = getLoadType();
158 usage.charge(loadType);
163 toLog <<
"role = " << (int)role;
165 toLog <<
" address = ";
166 if (
auto clientIp = getClientIpAddress())
167 toLog << clientIp.value();
170 if (
auto user = getUser())
171 toLog << user.value();
204 grpc::StatusCode::FAILED_PRECONDITION,
206 responder_.FinishWithError(status,
this);
215 responder_.Finish(result.
first, result.
second,
this);
227 grpc::Status status{grpc::StatusCode::INTERNAL, ex.
what()};
228 responder_.FinishWithError(status,
this);
232 template <
class Request,
class Response>
237 if (
auto descriptor =
238 Request::GetDescriptor()->FindFieldByName(
"client_ip"))
240 Request::GetReflection()->SetString(&request_, descriptor, ctx_.peer());
242 <<
"Set client_ip to " << ctx_.peer();
247 Throw<std::runtime_error>(
248 "Attempting to forward but no client_ip field in "
254 grpc::ClientContext clientContext;
256 auto status = forward_(stub.get(), &clientContext, request_, &response);
257 responder_.Finish(response, status,
this);
263 <<
"Failed to forward request to tx";
265 grpc::StatusCode::INTERNAL,
266 "Attempted to act as proxy but failed "
267 "to create forwarding stub"};
268 responder_.FinishWithError(status,
this);
272 template <
class Request,
class Response>
279 template <
class Request,
class Response>
286 template <
class Request,
class Response>
292 else if (wasForwarded())
298 template <
class Request,
class Response>
302 if (
auto descriptor =
303 Request::GetDescriptor()->FindFieldByName(
"client_ip"))
306 Request::GetReflection()->GetString(request_, descriptor);
307 if (!clientIp.
empty())
315 template <
class Request,
class Response>
319 if (
auto descriptor = Request::GetDescriptor()->FindFieldByName(
"user"))
322 Request::GetReflection()->GetString(request_, descriptor);
331 template <
class Request,
class Response>
335 auto endpoint = getClientEndpoint();
337 return endpoint->address();
341 template <
class Request,
class Response>
345 auto endpoint = getProxiedClientEndpoint();
347 return endpoint->address();
351 template <
class Request,
class Response>
355 auto descriptor = Request::GetDescriptor()->FindFieldByName(
"client_ip");
359 Request::GetReflection()->GetString(request_, descriptor);
360 if (!clientIp.
empty())
363 <<
"Got client_ip from request : " << clientIp;
364 return getEndpoint(clientIp);
370 template <
class Request,
class Response>
374 return getEndpoint(ctx_.peer());
377 template <
class Request,
class Response>
383 auto clientIp = getClientIpAddress();
384 auto proxiedIp = getProxiedClientIpAddress();
385 if (clientIp && !proxiedIp)
396 template <
class Request,
class Response>
404 if (
auto descriptor =
405 Response::GetDescriptor()->FindFieldByName(
"is_unlimited"))
407 Response::GetReflection()->SetBool(&response, descriptor,
true);
412 template <
class Request,
class Response>
416 auto endpoint = getClientEndpoint();
417 auto proxiedEndpoint = getProxiedClientEndpoint();
424 Throw<std::runtime_error>(
"Failed to get client endpoint");
435 auto const optIp = section.
get(
"ip");
439 auto const optPort = section.
get(
"port");
444 boost::asio::ip::tcp::endpoint endpoint(
445 boost::asio::ip::make_address(*optIp),
std::stoi(*optPort));
453 JLOG(
journal_.
error()) <<
"Error setting grpc server address";
454 Throw<std::runtime_error>(
"Error setting grpc server address");
457 auto const optSecureGateway = section.
get(
"secure_gateway");
458 if (optSecureGateway)
466 boost::algorithm::trim(ip);
467 auto const addr = boost::asio::ip::make_address(ip);
469 if (addr.is_unspecified())
472 <<
"Can't pass unspecified IP in "
473 <<
"secure_gateway section of port_grpc";
474 Throw<std::runtime_error>(
475 "Unspecified IP in secure_gateway section");
484 <<
"Error parsing secure gateway IPs for grpc server";
485 Throw<std::runtime_error>(
486 "Error parsing secure_gateway section");
511 JLOG(
journal_.
debug()) <<
"Completion Queue has been shutdown";
527 return sPtr.get() == ptr;
529 BOOST_ASSERT(it != requests.
end());
530 it->swap(requests.
back());
550 while (
cq_->Next(&tag, &ok))
554 <<
" ptr = " << ptr <<
" ok = " << ok;
559 <<
"Destroying object";
564 if (!ptr->isFinished())
566 JLOG(
journal_.
debug()) <<
"Received new request. Processing";
569 auto cloned = ptr->clone();
576 JLOG(
journal_.
debug()) <<
"Sent response. Destroying object";
590 auto addToRequests = [&requests](
auto callData) {
596 org::xrpl::rpc::v1::GetLedgerRequest,
597 org::xrpl::rpc::v1::GetLedgerResponse>;
599 addToRequests(std::make_shared<cd>(
603 &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
606 &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedger,
613 org::xrpl::rpc::v1::GetLedgerDataRequest,
614 org::xrpl::rpc::v1::GetLedgerDataResponse>;
616 addToRequests(std::make_shared<cd>(
620 &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
621 RequestGetLedgerData,
623 &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedgerData,
630 org::xrpl::rpc::v1::GetLedgerDiffRequest,
631 org::xrpl::rpc::v1::GetLedgerDiffResponse>;
633 addToRequests(std::make_shared<cd>(
637 &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
638 RequestGetLedgerDiff,
640 &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedgerDiff,
647 org::xrpl::rpc::v1::GetLedgerEntryRequest,
648 org::xrpl::rpc::v1::GetLedgerEntryResponse>;
650 addToRequests(std::make_shared<cd>(
654 &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
655 RequestGetLedgerEntry,
657 &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedgerEntry,
674 grpc::ServerBuilder builder;
676 builder.AddListeningPort(
serverAddress_, grpc::InsecureServerCredentials());
682 cq_ = builder.AddCompletionQueue();
684 server_ = builder.BuildAndStart();