rippled
Loading...
Searching...
No Matches
GRPCServer.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2020 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <xrpld/app/main/GRPCServer.h>
21#include <xrpld/core/ConfigSections.h>
22
23#include <xrpl/beast/core/CurrentThreadName.h>
24#include <xrpl/beast/net/IPAddressConversion.h>
25#include <xrpl/resource/Fees.h>
26
27namespace ripple {
28
29namespace {
30
31// helper function. converts string to endpoint. handles ipv4 and ipv6, with or
32// without port, with or without prepended scheme
34getEndpoint(std::string const& peer)
35{
36 try
37 {
38 std::size_t first = peer.find_first_of(":");
39 std::size_t last = peer.find_last_of(":");
40 std::string peerClean(peer);
41 if (first != last)
42 {
43 peerClean = peer.substr(first + 1);
44 }
45
48 if (endpoint)
49 return beast::IP::to_asio_endpoint(endpoint.value());
50 }
51 catch (std::exception const&)
52 {
53 }
54 return {};
55}
56
57} // namespace
58
59template <class Request, class Response>
61 org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService& service,
62 grpc::ServerCompletionQueue& cq,
63 Application& app,
67 RPC::Condition requiredCondition,
68 Resource::Charge loadType,
69 std::vector<boost::asio::ip::address> const& secureGatewayIPs)
70 : service_(service)
71 , cq_(cq)
72 , finished_(false)
73 , app_(app)
74 , responder_(&ctx_)
75 , bindListener_(std::move(bindListener))
76 , handler_(std::move(handler))
77 , forward_(std::move(forward))
78 , requiredCondition_(std::move(requiredCondition))
79 , loadType_(std::move(loadType))
80 , secureGatewayIPs_(secureGatewayIPs)
81{
82 // Bind a listener. When a request is received, "this" will be returned
83 // from CompletionQueue::Next
85}
86
87template <class Request, class Response>
90{
91 return std::make_shared<CallData<Request, Response>>(
93 cq_,
94 app_,
95 bindListener_,
96 handler_,
97 forward_,
98 requiredCondition_,
99 loadType_,
101}
102
103template <class Request, class Response>
104void
106{
107 // sanity check
108 BOOST_ASSERT(!finished_);
109
111 this->shared_from_this();
112
113 // Need to set finished to true before processing the response,
114 // because as soon as the response is posted to the completion
115 // queue (via responder_.Finish(...) or responder_.FinishWithError(...)),
116 // the CallData object is returned as a tag in handleRpcs().
117 // handleRpcs() checks the finished variable, and if true, destroys
118 // the object. Setting finished to true before calling process
119 // ensures that finished is always true when this CallData object
120 // is returned as a tag in handleRpcs(), after sending the response
121 finished_ = true;
122 auto coro = app_.getJobQueue().postCoro(
124 "gRPC-Client",
125 [thisShared](std::shared_ptr<JobQueue::Coro> coro) {
126 thisShared->process(coro);
127 });
128
129 // If coro is null, then the JobQueue has already been shutdown
130 if (!coro)
131 {
132 grpc::Status status{
133 grpc::StatusCode::INTERNAL, "Job Queue is already stopped"};
134 responder_.FinishWithError(status, this);
135 }
136}
137
138template <class Request, class Response>
139void
142{
143 try
144 {
145 auto usage = getUsage();
146 bool isUnlimited = clientIsUnlimited();
147 if (!isUnlimited && usage.disconnect(app_.journal("gRPCServer")))
148 {
149 grpc::Status status{
150 grpc::StatusCode::RESOURCE_EXHAUSTED,
151 "usage balance exceeds threshold"};
152 responder_.FinishWithError(status, this);
153 }
154 else
155 {
156 auto loadType = getLoadType();
157 usage.charge(loadType);
158 auto role = getRole(isUnlimited);
159
160 {
161 std::stringstream toLog;
162 toLog << "role = " << (int)role;
163
164 toLog << " address = ";
165 if (auto clientIp = getClientIpAddress())
166 toLog << clientIp.value();
167
168 toLog << " user = ";
169 if (auto user = getUser())
170 toLog << user.value();
171 toLog << " isUnlimited = " << isUnlimited;
172
173 JLOG(app_.journal("GRPCServer::Calldata").debug())
174 << toLog.str();
175 }
176
178 {app_.journal("gRPCServer"),
179 app_,
180 loadType,
181 app_.getOPs(),
183 usage,
184 role,
185 coro,
187 apiVersion},
188 request_};
189
190 // Make sure we can currently handle the rpc
191 error_code_i conditionMetRes =
192 RPC::conditionMet(requiredCondition_, context);
193
194 if (conditionMetRes != rpcSUCCESS)
195 {
196 RPC::ErrorInfo errorInfo = RPC::get_error_info(conditionMetRes);
197 grpc::Status status{
198 grpc::StatusCode::FAILED_PRECONDITION,
199 errorInfo.message.c_str()};
200 responder_.FinishWithError(status, this);
201 }
202 else
203 {
204 std::pair<Response, grpc::Status> result = handler_(context);
205 setIsUnlimited(result.first, isUnlimited);
206 responder_.Finish(result.first, result.second, this);
207 }
208 }
209 }
210 catch (std::exception const& ex)
211 {
212 grpc::Status status{grpc::StatusCode::INTERNAL, ex.what()};
213 responder_.FinishWithError(status, this);
214 }
215}
216
217template <class Request, class Response>
218bool
220{
221 return finished_;
222}
223
224template <class Request, class Response>
227{
228 return loadType_;
229}
230
231template <class Request, class Response>
232Role
234{
235 if (isUnlimited)
236 return Role::IDENTIFIED;
237 else
238 return Role::USER;
239}
240
241template <class Request, class Response>
244{
245 if (auto descriptor = Request::GetDescriptor()->FindFieldByName("user"))
246 {
247 std::string user =
248 Request::GetReflection()->GetString(request_, descriptor);
249 if (!user.empty())
250 {
251 return user;
252 }
253 }
254 return {};
255}
256
257template <class Request, class Response>
260{
261 auto endpoint = getClientEndpoint();
262 if (endpoint)
263 return endpoint->address();
264 return {};
265}
266
267template <class Request, class Response>
270{
271 return ripple::getEndpoint(ctx_.peer());
272}
273
274template <class Request, class Response>
275bool
277{
278 if (!getUser())
279 return false;
280 auto clientIp = getClientIpAddress();
281 if (clientIp)
282 {
283 for (auto& ip : secureGatewayIPs_)
284 {
285 if (ip == clientIp)
286 return true;
287 }
288 }
289 return false;
290}
291
292template <class Request, class Response>
293void
295 Response& response,
296 bool isUnlimited)
297{
298 if (isUnlimited)
299 {
300 if (auto descriptor =
301 Response::GetDescriptor()->FindFieldByName("is_unlimited"))
302 {
303 Response::GetReflection()->SetBool(&response, descriptor, true);
304 }
305 }
306}
307
308template <class Request, class Response>
311{
312 auto endpoint = getClientEndpoint();
313 if (endpoint)
315 beast::IP::from_asio(endpoint.value()));
316 Throw<std::runtime_error>("Failed to get client endpoint");
317}
318
320 : app_(app), journal_(app_.journal("gRPC Server"))
321{
322 // if present, get endpoint from config
323 if (app_.config().exists(SECTION_PORT_GRPC))
324 {
325 Section const& section = app_.config().section(SECTION_PORT_GRPC);
326
327 auto const optIp = section.get("ip");
328 if (!optIp)
329 return;
330
331 auto const optPort = section.get("port");
332 if (!optPort)
333 return;
334 try
335 {
336 boost::asio::ip::tcp::endpoint endpoint(
337 boost::asio::ip::make_address(*optIp), std::stoi(*optPort));
338
340 ss << endpoint;
341 serverAddress_ = ss.str();
342 }
343 catch (std::exception const&)
344 {
345 JLOG(journal_.error()) << "Error setting grpc server address";
346 Throw<std::runtime_error>("Error setting grpc server address");
347 }
348
349 auto const optSecureGateway = section.get("secure_gateway");
350 if (optSecureGateway)
351 {
352 try
353 {
354 std::stringstream ss{*optSecureGateway};
355 std::string ip;
356 while (std::getline(ss, ip, ','))
357 {
358 boost::algorithm::trim(ip);
359 auto const addr = boost::asio::ip::make_address(ip);
360
361 if (addr.is_unspecified())
362 {
363 JLOG(journal_.error())
364 << "Can't pass unspecified IP in "
365 << "secure_gateway section of port_grpc";
366 Throw<std::runtime_error>(
367 "Unspecified IP in secure_gateway section");
368 }
369
371 }
372 }
373 catch (std::exception const&)
374 {
375 JLOG(journal_.error())
376 << "Error parsing secure gateway IPs for grpc server";
377 Throw<std::runtime_error>(
378 "Error parsing secure_gateway section");
379 }
380 }
381 }
382}
383
384void
386{
387 JLOG(journal_.debug()) << "Shutting down";
388
389 // The below call cancels all "listeners" (CallData objects that are waiting
390 // for a request, as opposed to processing a request), and blocks until all
391 // requests being processed are completed. CallData objects in the midst of
392 // processing requests need to actually send data back to the client, via
393 // responder_.Finish(...) or responder_.FinishWithError(...), for this call
394 // to unblock. Each cancelled listener is returned via cq_.Next(...) with ok
395 // set to false
396 server_->Shutdown();
397 JLOG(journal_.debug()) << "Server has been shutdown";
398
399 // Always shutdown the completion queue after the server. This call allows
400 // cq_.Next() to return false, once all events posted to the completion
401 // queue have been processed. See handleRpcs() for more details.
402 cq_->Shutdown();
403 JLOG(journal_.debug()) << "Completion Queue has been shutdown";
404}
405
406void
408{
409 // This collection should really be an unordered_set. However, to delete
410 // from the unordered_set, we need a shared_ptr, but cq_.Next() (see below
411 // while loop) sets the tag to a raw pointer.
413
414 auto erase = [&requests](Processor* ptr) {
415 auto it = std::find_if(
416 requests.begin(),
417 requests.end(),
418 [ptr](std::shared_ptr<Processor>& sPtr) {
419 return sPtr.get() == ptr;
420 });
421 BOOST_ASSERT(it != requests.end());
422 it->swap(requests.back());
423 requests.pop_back();
424 };
425
426 void* tag; // uniquely identifies a request.
427 bool ok;
428 // Block waiting to read the next event from the completion queue. The
429 // event is uniquely identified by its tag, which in this case is the
430 // memory address of a CallData instance.
431 // The return value of Next should always be checked. This return value
432 // tells us whether there is any kind of event or cq_ is shutting down.
433 // When cq_.Next(...) returns false, all work has been completed and the
434 // loop can exit. When the server is shutdown, each CallData object that is
435 // listening for a request is forceably cancelled, and is returned by
436 // cq_->Next() with ok set to false. Then, each CallData object processing
437 // a request must complete (by sending data to the client), each of which
438 // will be returned from cq_->Next() with ok set to true. After all
439 // cancelled listeners and all CallData objects processing requests are
440 // returned via cq_->Next(), cq_->Next() will return false, causing the
441 // loop to exit.
442 while (cq_->Next(&tag, &ok))
443 {
444 auto ptr = static_cast<Processor*>(tag);
445 JLOG(journal_.trace()) << "Processing CallData object."
446 << " ptr = " << ptr << " ok = " << ok;
447
448 if (!ok)
449 {
450 JLOG(journal_.debug())
451 << "Request listener cancelled. " << "Destroying object";
452 erase(ptr);
453 }
454 else
455 {
456 if (!ptr->isFinished())
457 {
458 JLOG(journal_.debug()) << "Received new request. Processing";
459 // ptr is now processing a request, so create a new CallData
460 // object to handle additional requests
461 auto cloned = ptr->clone();
462 requests.push_back(cloned);
463 // process the request
464 ptr->process();
465 }
466 else
467 {
468 JLOG(journal_.debug()) << "Sent response. Destroying object";
469 erase(ptr);
470 }
471 }
472 }
473 JLOG(journal_.debug()) << "Completion Queue drained";
474}
475
476// create a CallData instance for each RPC
479{
481
482 auto addToRequests = [&requests](auto callData) {
483 requests.push_back(std::move(callData));
484 };
485
486 {
487 using cd = CallData<
488 org::xrpl::rpc::v1::GetLedgerRequest,
489 org::xrpl::rpc::v1::GetLedgerResponse>;
490
491 addToRequests(std::make_shared<cd>(
492 service_,
493 *cq_,
494 app_,
495 &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
496 RequestGetLedger,
498 &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedger,
502 }
503 {
504 using cd = CallData<
505 org::xrpl::rpc::v1::GetLedgerDataRequest,
506 org::xrpl::rpc::v1::GetLedgerDataResponse>;
507
508 addToRequests(std::make_shared<cd>(
509 service_,
510 *cq_,
511 app_,
512 &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
513 RequestGetLedgerData,
515 &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedgerData,
519 }
520 {
521 using cd = CallData<
522 org::xrpl::rpc::v1::GetLedgerDiffRequest,
523 org::xrpl::rpc::v1::GetLedgerDiffResponse>;
524
525 addToRequests(std::make_shared<cd>(
526 service_,
527 *cq_,
528 app_,
529 &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
530 RequestGetLedgerDiff,
532 &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedgerDiff,
536 }
537 {
538 using cd = CallData<
539 org::xrpl::rpc::v1::GetLedgerEntryRequest,
540 org::xrpl::rpc::v1::GetLedgerEntryResponse>;
541
542 addToRequests(std::make_shared<cd>(
543 service_,
544 *cq_,
545 app_,
546 &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
547 RequestGetLedgerEntry,
549 &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedgerEntry,
553 }
554 return requests;
555}
556
557bool
559{
560 // if config does not specify a grpc server address, don't start
561 if (serverAddress_.empty())
562 return false;
563
564 JLOG(journal_.info()) << "Starting gRPC server at " << serverAddress_;
565
566 grpc::ServerBuilder builder;
567
568 // Listen on the given address without any authentication mechanism.
569 // Actually binded port will be returned into "port" variable.
570 int port = 0;
571 builder.AddListeningPort(
572 serverAddress_, grpc::InsecureServerCredentials(), &port);
573 // Register "service_" as the instance through which we'll communicate with
574 // clients. In this case it corresponds to an *asynchronous* service.
575 builder.RegisterService(&service_);
576 // Get hold of the completion queue used for the asynchronous communication
577 // with the gRPC runtime.
578 cq_ = builder.AddCompletionQueue();
579 // Finally assemble the server.
580 server_ = builder.BuildAndStart();
581 serverPort_ = static_cast<std::uint16_t>(port);
582
583 return static_cast<bool>(serverPort_);
584}
585
586boost::asio::ip::tcp::endpoint
588{
589 std::string const addr =
591 return boost::asio::ip::tcp::endpoint(
592 boost::asio::ip::make_address(addr), serverPort_);
593}
594
595bool
597{
598 // Start the server and setup listeners
599 if (running_ = impl_.start(); running_)
600 {
601 thread_ = std::thread([this]() {
602 // Start the event loop and begin handling requests
603 beast::setCurrentThreadName("rippled: grpc");
604 this->impl_.handleRpcs();
605 });
606 }
607 return running_;
608}
609
610void
612{
613 if (running_)
614 {
615 impl_.shutdown();
616 thread_.join();
617 running_ = false;
618 }
619}
620
622{
623 XRPL_ASSERT(!running_, "ripple::GRPCServer::~GRPCServer : is not running");
624}
625
626boost::asio::ip::tcp::endpoint
628{
629 return impl_.getEndpoint();
630}
631
632} // namespace ripple
T back(T... args)
T begin(T... args)
constexpr const char * c_str() const
Definition: json_value.h:75
static std::optional< Endpoint > from_string_checked(std::string const &s)
Create an Endpoint from a string.
Definition: IPEndpoint.cpp:45
Stream error() const
Definition: Journal.h:346
Stream debug() const
Definition: Journal.h:328
Stream info() const
Definition: Journal.h:334
Stream trace() const
Severity stream access functions.
Definition: Journal.h:322
virtual Config & config()=0
virtual beast::Journal journal(std::string const &name)=0
virtual JobQueue & getJobQueue()=0
virtual Resource::Manager & getResourceManager()=0
virtual NetworkOPs & getOPs()=0
virtual LedgerMaster & getLedgerMaster()=0
bool exists(std::string const &name) const
Returns true if a section with the given name exists.
Section & section(std::string const &name)
Returns the section with the given name.
Resource::Consumer getUsage()
Definition: GRPCServer.cpp:310
std::optional< std::string > getUser()
Definition: GRPCServer.cpp:243
grpc::ServerCompletionQueue & cq_
Definition: GRPCServer.h:161
grpc::ServerContext ctx_
Definition: GRPCServer.h:166
Resource::Charge getLoadType()
Definition: GRPCServer.cpp:226
void setIsUnlimited(Response &response, bool isUnlimited)
Definition: GRPCServer.cpp:294
std::optional< boost::asio::ip::address > getClientIpAddress()
Definition: GRPCServer.cpp:259
BindListener< Request, Response > bindListener_
Definition: GRPCServer.h:184
grpc::ServerAsyncResponseWriter< Response > responder_
Definition: GRPCServer.h:181
Role getRole(bool isUnlimited)
Definition: GRPCServer.cpp:233
std::shared_ptr< Processor > clone() override
Definition: GRPCServer.cpp:89
CallData(org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService &service, grpc::ServerCompletionQueue &cq, Application &app, BindListener< Request, Response > bindListener, Handler< Request, Response > handler, Forward< Request, Response > forward, RPC::Condition requiredCondition, Resource::Charge loadType, std::vector< boost::asio::ip::address > const &secureGatewayIPs)
Definition: GRPCServer.cpp:60
std::optional< boost::asio::ip::tcp::endpoint > getClientEndpoint()
Definition: GRPCServer.cpp:269
org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService & service_
Definition: GRPCServer.h:158
virtual void process() override
Definition: GRPCServer.cpp:105
virtual bool isFinished() override
Definition: GRPCServer.cpp:219
std::string serverAddress_
Definition: GRPCServer.h:85
Application & app_
Definition: GRPCServer.h:83
std::vector< std::shared_ptr< Processor > > setupListeners()
Definition: GRPCServer.cpp:478
std::unique_ptr< grpc::Server > server_
Definition: GRPCServer.h:81
std::unique_ptr< grpc::ServerCompletionQueue > cq_
Definition: GRPCServer.h:74
org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService service_
Definition: GRPCServer.h:79
GRPCServerImpl(Application &app)
Definition: GRPCServer.cpp:319
static unsigned constexpr apiVersion
Definition: GRPCServer.h:111
boost::asio::ip::tcp::endpoint getEndpoint() const
Definition: GRPCServer.cpp:587
std::vector< boost::asio::ip::address > secureGatewayIPs_
Definition: GRPCServer.h:88
std::uint16_t serverPort_
Definition: GRPCServer.h:86
beast::Journal journal_
Definition: GRPCServer.h:90
std::thread thread_
Definition: GRPCServer.h:325
boost::asio::ip::tcp::endpoint getEndpoint() const
Definition: GRPCServer.cpp:627
GRPCServerImpl impl_
Definition: GRPCServer.h:324
std::shared_ptr< InfoSub > pointer
Definition: InfoSub.h:54
std::shared_ptr< Coro > postCoro(JobType t, std::string const &name, F &&f)
Creates a coroutine and adds a job to the queue which will run it.
Definition: JobQueue.h:411
A consumption charge.
Definition: Charge.h:31
An endpoint that consumes resources.
Definition: Consumer.h:35
virtual Consumer newInboundEndpoint(beast::IP::Endpoint const &address)=0
Create a new endpoint keyed by inbound IP address or the forwarded IP if proxied.
Holds a collection of configuration values.
Definition: BasicConfig.h:45
std::optional< T > get(std::string const &name) const
Definition: BasicConfig.h:140
T emplace_back(T... args)
T empty(T... args)
T end(T... args)
T find_first_of(T... args)
T find_if(T... args)
T find_last_of(T... args)
T getline(T... args)
T join(T... args)
boost::asio::ip::tcp::endpoint to_asio_endpoint(Endpoint const &endpoint)
Convert to asio::ip::tcp::endpoint.
Endpoint from_asio(boost::asio::ip::address const &address)
Convert to Endpoint.
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
@ NO_CONDITION
Definition: Handler.h:39
error_code_i conditionMet(Condition condition_required, T &context)
Definition: Handler.h:80
ErrorInfo const & get_error_info(error_code_i code)
Returns an ErrorInfo that reflects the error code.
Definition: ErrorCodes.cpp:179
Charge const feeMediumBurdenRPC
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: algorithm.h:26
error_code_i
Definition: ErrorCodes.h:40
@ rpcSUCCESS
Definition: ErrorCodes.h:44
bool isUnlimited(Role const &role)
ADMIN and IDENTIFIED roles shall have unlimited resources.
Definition: Role.cpp:125
std::pair< org::xrpl::rpc::v1::GetLedgerResponse, grpc::Status > doLedgerGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetLedgerRequest > &context)
std::pair< org::xrpl::rpc::v1::GetLedgerEntryResponse, grpc::Status > doLedgerEntryGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetLedgerEntryRequest > &context)
void erase(STObject &st, TypedField< U > const &f)
Remove a field in an STObject.
Definition: STExchange.h:172
Role
Indicates the level of administrative permission to grant.
Definition: Role.h:44
@ jtRPC
Definition: Job.h:52
std::pair< org::xrpl::rpc::v1::GetLedgerDiffResponse, grpc::Status > doLedgerDiffGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetLedgerDiffRequest > &context)
Definition: LedgerDiff.cpp:6
std::pair< org::xrpl::rpc::v1::GetLedgerDataResponse, grpc::Status > doLedgerDataGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetLedgerDataRequest > &context)
Definition: LedgerData.cpp:136
STL namespace.
T pop_back(T... args)
T push_back(T... args)
T stoi(T... args)
T str(T... args)
Maps an rpc error code to its token, default message, and HTTP status.
Definition: ErrorCodes.h:179
Json::StaticString message
Definition: ErrorCodes.h:211
T substr(T... args)
T value(T... args)
T what(T... args)