rippled
Loading...
Searching...
No Matches
GRPCServer.cpp
1#include <xrpld/app/main/GRPCServer.h>
2#include <xrpld/core/ConfigSections.h>
3
4#include <xrpl/beast/core/CurrentThreadName.h>
5#include <xrpl/beast/net/IPAddressConversion.h>
6#include <xrpl/resource/Fees.h>
7
8namespace xrpl {
9
10namespace {
11
12// helper function. converts string to endpoint. handles ipv4 and ipv6, with or
13// without port, with or without prepended scheme
15getEndpoint(std::string const& peer)
16{
17 try
18 {
19 std::size_t first = peer.find_first_of(":");
20 std::size_t last = peer.find_last_of(":");
21 std::string peerClean(peer);
22 if (first != last)
23 {
24 peerClean = peer.substr(first + 1);
25 }
26
28 if (endpoint)
29 return beast::IP::to_asio_endpoint(endpoint.value());
30 }
31 catch (std::exception const&)
32 {
33 }
34 return {};
35}
36
37} // namespace
38
39template <class Request, class Response>
41 org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService& service,
42 grpc::ServerCompletionQueue& cq,
43 Application& app,
47 RPC::Condition requiredCondition,
48 Resource::Charge loadType,
49 std::vector<boost::asio::ip::address> const& secureGatewayIPs)
50 : service_(service)
51 , cq_(cq)
52 , finished_(false)
53 , app_(app)
54 , responder_(&ctx_)
55 , bindListener_(std::move(bindListener))
56 , handler_(std::move(handler))
57 , forward_(std::move(forward))
58 , requiredCondition_(std::move(requiredCondition))
59 , loadType_(std::move(loadType))
60 , secureGatewayIPs_(secureGatewayIPs)
61{
62 // Bind a listener. When a request is received, "this" will be returned
63 // from CompletionQueue::Next
65}
66
67template <class Request, class Response>
70{
72 service_, cq_, app_, bindListener_, handler_, forward_, requiredCondition_, loadType_, secureGatewayIPs_);
73}
74
75template <class Request, class Response>
76void
78{
79 // sanity check
80 BOOST_ASSERT(!finished_);
81
82 std::shared_ptr<CallData<Request, Response>> thisShared = this->shared_from_this();
83
84 // Need to set finished to true before processing the response,
85 // because as soon as the response is posted to the completion
86 // queue (via responder_.Finish(...) or responder_.FinishWithError(...)),
87 // the CallData object is returned as a tag in handleRpcs().
88 // handleRpcs() checks the finished variable, and if true, destroys
89 // the object. Setting finished to true before calling process
90 // ensures that finished is always true when this CallData object
91 // is returned as a tag in handleRpcs(), after sending the response
92 finished_ = true;
93 auto coro =
94 app_.getJobQueue().postCoro(JobType::jtRPC, "gRPC-Client", [thisShared](std::shared_ptr<JobQueue::Coro> coro) {
95 thisShared->process(coro);
96 });
97
98 // If coro is null, then the JobQueue has already been shutdown
99 if (!coro)
100 {
101 grpc::Status status{grpc::StatusCode::INTERNAL, "Job Queue is already stopped"};
102 responder_.FinishWithError(status, this);
103 }
104}
105
106template <class Request, class Response>
107void
109{
110 try
111 {
112 auto usage = getUsage();
113 bool isUnlimited = clientIsUnlimited();
114 if (!isUnlimited && usage.disconnect(app_.journal("gRPCServer")))
115 {
116 grpc::Status status{grpc::StatusCode::RESOURCE_EXHAUSTED, "usage balance exceeds threshold"};
117 responder_.FinishWithError(status, this);
118 }
119 else
120 {
121 auto loadType = getLoadType();
122 usage.charge(loadType);
123 auto role = getRole(isUnlimited);
124
125 {
126 std::stringstream toLog;
127 toLog << "role = " << (int)role;
128
129 toLog << " address = ";
130 if (auto clientIp = getClientIpAddress())
131 toLog << clientIp.value();
132
133 toLog << " user = ";
134 if (auto user = getUser())
135 toLog << user.value();
136 toLog << " isUnlimited = " << isUnlimited;
137
138 JLOG(app_.journal("GRPCServer::Calldata").debug()) << toLog.str();
139 }
140
142 {app_.journal("gRPCServer"),
143 app_,
144 loadType,
145 app_.getOPs(),
147 usage,
148 role,
149 coro,
151 apiVersion},
152 request_};
153
154 // Make sure we can currently handle the rpc
155 error_code_i conditionMetRes = RPC::conditionMet(requiredCondition_, context);
156
157 if (conditionMetRes != rpcSUCCESS)
158 {
159 RPC::ErrorInfo errorInfo = RPC::get_error_info(conditionMetRes);
160 grpc::Status status{grpc::StatusCode::FAILED_PRECONDITION, errorInfo.message.c_str()};
161 responder_.FinishWithError(status, this);
162 }
163 else
164 {
165 std::pair<Response, grpc::Status> result = handler_(context);
166 setIsUnlimited(result.first, isUnlimited);
167 responder_.Finish(result.first, result.second, this);
168 }
169 }
170 }
171 catch (std::exception const& ex)
172 {
173 grpc::Status status{grpc::StatusCode::INTERNAL, ex.what()};
174 responder_.FinishWithError(status, this);
175 }
176}
177
178template <class Request, class Response>
179bool
184
185template <class Request, class Response>
191
192template <class Request, class Response>
193Role
201
202template <class Request, class Response>
205{
206 if (auto descriptor = Request::GetDescriptor()->FindFieldByName("user"))
207 {
208 std::string user = Request::GetReflection()->GetString(request_, descriptor);
209 if (!user.empty())
210 {
211 return user;
212 }
213 }
214 return {};
215}
216
217template <class Request, class Response>
220{
221 auto endpoint = getClientEndpoint();
222 if (endpoint)
223 return endpoint->address();
224 return {};
225}
226
227template <class Request, class Response>
230{
231 return xrpl::getEndpoint(ctx_.peer());
232}
233
234template <class Request, class Response>
235bool
237{
238 if (!getUser())
239 return false;
240 auto clientIp = getClientIpAddress();
241 if (clientIp)
242 {
243 for (auto& ip : secureGatewayIPs_)
244 {
245 if (ip == clientIp)
246 return true;
247 }
248 }
249 return false;
250}
251
252template <class Request, class Response>
253void
255{
256 if (isUnlimited)
257 {
258 if (auto descriptor = Response::GetDescriptor()->FindFieldByName("is_unlimited"))
259 {
260 Response::GetReflection()->SetBool(&response, descriptor, true);
261 }
262 }
263}
264
265template <class Request, class Response>
268{
269 auto endpoint = getClientEndpoint();
270 if (endpoint)
272 Throw<std::runtime_error>("Failed to get client endpoint");
273}
274
275GRPCServerImpl::GRPCServerImpl(Application& app) : app_(app), journal_(app_.journal("gRPC Server"))
276{
277 // if present, get endpoint from config
278 if (app_.config().exists(SECTION_PORT_GRPC))
279 {
280 Section const& section = app_.config().section(SECTION_PORT_GRPC);
281
282 auto const optIp = section.get("ip");
283 if (!optIp)
284 return;
285
286 auto const optPort = section.get("port");
287 if (!optPort)
288 return;
289 try
290 {
291 boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::make_address(*optIp), std::stoi(*optPort));
292
294 ss << endpoint;
295 serverAddress_ = ss.str();
296 }
297 catch (std::exception const&)
298 {
299 JLOG(journal_.error()) << "Error setting grpc server address";
300 Throw<std::runtime_error>("Error setting grpc server address");
301 }
302
303 auto const optSecureGateway = section.get("secure_gateway");
304 if (optSecureGateway)
305 {
306 try
307 {
308 std::stringstream ss{*optSecureGateway};
309 std::string ip;
310 while (std::getline(ss, ip, ','))
311 {
312 boost::algorithm::trim(ip);
313 auto const addr = boost::asio::ip::make_address(ip);
314
315 if (addr.is_unspecified())
316 {
317 JLOG(journal_.error()) << "Can't pass unspecified IP in "
318 << "secure_gateway section of port_grpc";
319 Throw<std::runtime_error>("Unspecified IP in secure_gateway section");
320 }
321
323 }
324 }
325 catch (std::exception const&)
326 {
327 JLOG(journal_.error()) << "Error parsing secure gateway IPs for grpc server";
328 Throw<std::runtime_error>("Error parsing secure_gateway section");
329 }
330 }
331 }
332}
333
334void
336{
337 JLOG(journal_.debug()) << "Shutting down";
338
339 // The below call cancels all "listeners" (CallData objects that are waiting
340 // for a request, as opposed to processing a request), and blocks until all
341 // requests being processed are completed. CallData objects in the midst of
342 // processing requests need to actually send data back to the client, via
343 // responder_.Finish(...) or responder_.FinishWithError(...), for this call
344 // to unblock. Each cancelled listener is returned via cq_.Next(...) with ok
345 // set to false
346 server_->Shutdown();
347 JLOG(journal_.debug()) << "Server has been shutdown";
348
349 // Always shutdown the completion queue after the server. This call allows
350 // cq_.Next() to return false, once all events posted to the completion
351 // queue have been processed. See handleRpcs() for more details.
352 cq_->Shutdown();
353 JLOG(journal_.debug()) << "Completion Queue has been shutdown";
354}
355
356void
358{
359 // This collection should really be an unordered_set. However, to delete
360 // from the unordered_set, we need a shared_ptr, but cq_.Next() (see below
361 // while loop) sets the tag to a raw pointer.
363
364 auto erase = [&requests](Processor* ptr) {
365 auto it = std::find_if(
366 requests.begin(), requests.end(), [ptr](std::shared_ptr<Processor>& sPtr) { return sPtr.get() == ptr; });
367 BOOST_ASSERT(it != requests.end());
368 it->swap(requests.back());
369 requests.pop_back();
370 };
371
372 void* tag; // uniquely identifies a request.
373 bool ok;
374 // Block waiting to read the next event from the completion queue. The
375 // event is uniquely identified by its tag, which in this case is the
376 // memory address of a CallData instance.
377 // The return value of Next should always be checked. This return value
378 // tells us whether there is any kind of event or cq_ is shutting down.
379 // When cq_.Next(...) returns false, all work has been completed and the
380 // loop can exit. When the server is shutdown, each CallData object that is
381 // listening for a request is forcibly cancelled, and is returned by
382 // cq_->Next() with ok set to false. Then, each CallData object processing
383 // a request must complete (by sending data to the client), each of which
384 // will be returned from cq_->Next() with ok set to true. After all
385 // cancelled listeners and all CallData objects processing requests are
386 // returned via cq_->Next(), cq_->Next() will return false, causing the
387 // loop to exit.
388 while (cq_->Next(&tag, &ok))
389 {
390 auto ptr = static_cast<Processor*>(tag);
391 JLOG(journal_.trace()) << "Processing CallData object."
392 << " ptr = " << ptr << " ok = " << ok;
393
394 if (!ok)
395 {
396 JLOG(journal_.debug()) << "Request listener cancelled. "
397 << "Destroying object";
398 erase(ptr);
399 }
400 else
401 {
402 if (!ptr->isFinished())
403 {
404 JLOG(journal_.debug()) << "Received new request. Processing";
405 // ptr is now processing a request, so create a new CallData
406 // object to handle additional requests
407 auto cloned = ptr->clone();
408 requests.push_back(cloned);
409 // process the request
410 ptr->process();
411 }
412 else
413 {
414 JLOG(journal_.debug()) << "Sent response. Destroying object";
415 erase(ptr);
416 }
417 }
418 }
419 JLOG(journal_.debug()) << "Completion Queue drained";
420}
421
422// create a CallData instance for each RPC
425{
427
428 auto addToRequests = [&requests](auto callData) { requests.push_back(std::move(callData)); };
429
430 {
432
433 addToRequests(std::make_shared<cd>(
434 service_,
435 *cq_,
436 app_,
437 &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::RequestGetLedger,
439 &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedger,
443 }
444 {
446
447 addToRequests(std::make_shared<cd>(
448 service_,
449 *cq_,
450 app_,
451 &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::RequestGetLedgerData,
453 &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedgerData,
457 }
458 {
460
461 addToRequests(std::make_shared<cd>(
462 service_,
463 *cq_,
464 app_,
465 &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::RequestGetLedgerDiff,
467 &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedgerDiff,
471 }
472 {
474
475 addToRequests(std::make_shared<cd>(
476 service_,
477 *cq_,
478 app_,
479 &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::RequestGetLedgerEntry,
481 &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedgerEntry,
485 }
486 return requests;
487}
488
489bool
491{
492 // if config does not specify a grpc server address, don't start
493 if (serverAddress_.empty())
494 return false;
495
496 JLOG(journal_.info()) << "Starting gRPC server at " << serverAddress_;
497
498 grpc::ServerBuilder builder;
499
500 // Listen on the given address without any authentication mechanism.
501 // Actually binded port will be returned into "port" variable.
502 int port = 0;
503 builder.AddListeningPort(serverAddress_, grpc::InsecureServerCredentials(), &port);
504 // Register "service_" as the instance through which we'll communicate with
505 // clients. In this case it corresponds to an *asynchronous* service.
506 builder.RegisterService(&service_);
507 // Get hold of the completion queue used for the asynchronous communication
508 // with the gRPC runtime.
509 cq_ = builder.AddCompletionQueue();
510 // Finally assemble the server.
511 server_ = builder.BuildAndStart();
512 serverPort_ = static_cast<std::uint16_t>(port);
513
514 return static_cast<bool>(serverPort_);
515}
516
517boost::asio::ip::tcp::endpoint
519{
521 return boost::asio::ip::tcp::endpoint(boost::asio::ip::make_address(addr), serverPort_);
522}
523
524bool
526{
527 // Start the server and setup listeners
528 if (running_ = impl_.start(); running_)
529 {
530 thread_ = std::thread([this]() {
531 // Start the event loop and begin handling requests
532 beast::setCurrentThreadName("rippled: grpc");
533 this->impl_.handleRpcs();
534 });
535 }
536 return running_;
537}
538
539void
541{
542 if (running_)
543 {
544 impl_.shutdown();
545 thread_.join();
546 running_ = false;
547 }
548}
549
551{
552 XRPL_ASSERT(!running_, "xrpl::GRPCServer::~GRPCServer : is not running");
553}
554
555boost::asio::ip::tcp::endpoint
557{
558 return impl_.getEndpoint();
559}
560
561} // namespace xrpl
T back(T... args)
T begin(T... args)
constexpr char const * c_str() const
Definition json_value.h:58
static std::optional< Endpoint > from_string_checked(std::string const &s)
Create an Endpoint from a string.
Stream error() const
Definition Journal.h:319
Stream debug() const
Definition Journal.h:301
Stream info() const
Definition Journal.h:307
Stream trace() const
Severity stream access functions.
Definition Journal.h:295
virtual Config & config()=0
virtual beast::Journal journal(std::string const &name)=0
bool exists(std::string const &name) const
Returns true if a section with the given name exists.
Section & section(std::string const &name)
Returns the section with the given name.
Resource::Consumer getUsage()
std::optional< boost::asio::ip::address > getClientIpAddress()
org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService & service_
Definition GRPCServer.h:132
virtual bool isFinished() override
std::optional< std::string > getUser()
CallData(org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService &service, grpc::ServerCompletionQueue &cq, Application &app, BindListener< Request, Response > bindListener, Handler< Request, Response > handler, Forward< Request, Response > forward, RPC::Condition requiredCondition, Resource::Charge loadType, std::vector< boost::asio::ip::address > const &secureGatewayIPs)
BindListener< Request, Response > bindListener_
Definition GRPCServer.h:158
Role getRole(bool isUnlimited)
Resource::Charge getLoadType()
std::optional< boost::asio::ip::tcp::endpoint > getClientEndpoint()
virtual void process() override
grpc::ServerAsyncResponseWriter< Response > responder_
Definition GRPCServer.h:155
std::shared_ptr< Processor > clone() override
void setIsUnlimited(Response &response, bool isUnlimited)
grpc::ServerCompletionQueue & cq_
Definition GRPCServer.h:135
grpc::ServerContext ctx_
Definition GRPCServer.h:140
static unsigned constexpr apiVersion
Definition GRPCServer.h:90
std::uint16_t serverPort_
Definition GRPCServer.h:66
Application & app_
Definition GRPCServer.h:63
std::vector< boost::asio::ip::address > secureGatewayIPs_
Definition GRPCServer.h:68
std::unique_ptr< grpc::ServerCompletionQueue > cq_
Definition GRPCServer.h:54
std::vector< std::shared_ptr< Processor > > setupListeners()
GRPCServerImpl(Application &app)
beast::Journal journal_
Definition GRPCServer.h:70
std::unique_ptr< grpc::Server > server_
Definition GRPCServer.h:61
boost::asio::ip::tcp::endpoint getEndpoint() const
std::string serverAddress_
Definition GRPCServer.h:65
org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService service_
Definition GRPCServer.h:59
std::thread thread_
Definition GRPCServer.h:299
boost::asio::ip::tcp::endpoint getEndpoint() const
GRPCServerImpl impl_
Definition GRPCServer.h:298
std::shared_ptr< InfoSub > pointer
Definition InfoSub.h:35
std::shared_ptr< Coro > postCoro(JobType t, std::string const &name, F &&f)
Creates a coroutine and adds a job to the queue which will run it.
Definition JobQueue.h:387
A consumption charge.
Definition Charge.h:11
An endpoint that consumes resources.
Definition Consumer.h:17
virtual Consumer newInboundEndpoint(beast::IP::Endpoint const &address)=0
Create a new endpoint keyed by inbound IP address or the forwarded IP if proxied.
Holds a collection of configuration values.
Definition BasicConfig.h:25
std::optional< T > get(std::string const &name) const
virtual JobQueue & getJobQueue()=0
virtual NetworkOPs & getOPs()=0
virtual Resource::Manager & getResourceManager()=0
virtual LedgerMaster & getLedgerMaster()=0
T emplace_back(T... args)
T empty(T... args)
T end(T... args)
T find_first_of(T... args)
T find_if(T... args)
T find_last_of(T... args)
T getline(T... args)
T is_same_v
T join(T... args)
boost::asio::ip::tcp::endpoint to_asio_endpoint(Endpoint const &endpoint)
Convert to asio::ip::tcp::endpoint.
Endpoint from_asio(boost::asio::ip::address const &address)
Convert to Endpoint.
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
STL namespace.
@ NO_CONDITION
Definition Handler.h:21
ErrorInfo const & get_error_info(error_code_i code)
Returns an ErrorInfo that reflects the error code.
error_code_i conditionMet(Condition condition_required, T &context)
Definition Handler.h:60
Charge const feeMediumBurdenRPC
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
std::pair< org::xrpl::rpc::v1::GetLedgerDiffResponse, grpc::Status > doLedgerDiffGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetLedgerDiffRequest > &context)
Definition LedgerDiff.cpp:6
std::pair< org::xrpl::rpc::v1::GetLedgerDataResponse, grpc::Status > doLedgerDataGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetLedgerDataRequest > &context)
Role
Indicates the level of administrative permission to grant.
Definition Role.h:25
std::pair< org::xrpl::rpc::v1::GetLedgerResponse, grpc::Status > doLedgerGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetLedgerRequest > &context)
@ jtRPC
Definition Job.h:32
std::pair< org::xrpl::rpc::v1::GetLedgerEntryResponse, grpc::Status > doLedgerEntryGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetLedgerEntryRequest > &context)
void erase(STObject &st, TypedField< U > const &f)
Remove a field in an STObject.
Definition STExchange.h:149
bool isUnlimited(Role const &role)
ADMIN and IDENTIFIED roles shall have unlimited resources.
Definition Role.cpp:96
error_code_i
Definition ErrorCodes.h:21
@ rpcSUCCESS
Definition ErrorCodes.h:25
T pop_back(T... args)
T push_back(T... args)
T stoi(T... args)
T str(T... args)
Maps an rpc error code to its token, default message, and HTTP status.
Definition ErrorCodes.h:169
Json::StaticString message
Definition ErrorCodes.h:187
T substr(T... args)
T value(T... args)
T what(T... args)