rippled
GRPCServer.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2020 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/app/main/GRPCServer.h>
21 #include <ripple/resource/Fees.h>
22 
23 namespace ripple {
24 
25 namespace {
26 
27 // helper function. strips scheme from endpoint string
29 getEndpoint(std::string const& peer)
30 {
31  std::size_t first = peer.find_first_of(":");
32  std::size_t last = peer.find_last_of(":");
33  std::string peerClean(peer);
34  if (first != last)
35  {
36  peerClean = peer.substr(first + 1);
37  }
38  return peerClean;
39 }
40 } // namespace
41 
42 template <class Request, class Response>
44  org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService& service,
45  grpc::ServerCompletionQueue& cq,
46  Application& app,
49  RPC::Condition requiredCondition,
50  Resource::Charge loadType)
51  : service_(service)
52  , cq_(cq)
53  , finished_(false)
54  , app_(app)
55  , responder_(&ctx_)
56  , bindListener_(std::move(bindListener))
57  , handler_(std::move(handler))
58  , requiredCondition_(std::move(requiredCondition))
59  , loadType_(std::move(loadType))
60 {
61  // Bind a listener. When a request is received, "this" will be returned
62  // from CompletionQueue::Next
64 }
65 
66 template <class Request, class Response>
69 {
70  return std::make_shared<CallData<Request, Response>>(
71  service_,
72  cq_,
73  app_,
74  bindListener_,
75  handler_,
76  requiredCondition_,
77  loadType_);
78 }
79 
80 template <class Request, class Response>
81 void
83 {
84  // sanity check
85  BOOST_ASSERT(!finished_);
86 
88  this->shared_from_this();
89 
90  // Need to set finished to true before processing the response,
91  // because as soon as the response is posted to the completion
92  // queue (via responder_.Finish(...) or responder_.FinishWithError(...)),
93  // the CallData object is returned as a tag in handleRpcs().
94  // handleRpcs() checks the finished variable, and if true, destroys
95  // the object. Setting finished to true before calling process
96  // ensures that finished is always true when this CallData object
97  // is returned as a tag in handleRpcs(), after sending the response
98  finished_ = true;
99  auto coro = app_.getJobQueue().postCoro(
100  JobType::jtRPC,
101  "gRPC-Client",
102  [thisShared](std::shared_ptr<JobQueue::Coro> coro) {
103  thisShared->process(coro);
104  });
105 
106  // If coro is null, then the JobQueue has already been shutdown
107  if (!coro)
108  {
109  grpc::Status status{
110  grpc::StatusCode::INTERNAL, "Job Queue is already stopped"};
111  responder_.FinishWithError(status, this);
112  }
113 }
114 
115 template <class Request, class Response>
116 void
119 {
120  try
121  {
122  auto usage = getUsage();
123  if (usage.disconnect())
124  {
125  grpc::Status status{
126  grpc::StatusCode::RESOURCE_EXHAUSTED,
127  "usage balance exceeds threshhold"};
128  responder_.FinishWithError(status, this);
129  }
130  else
131  {
132  auto loadType = getLoadType();
133  usage.charge(loadType);
134  auto role = getRole();
135 
137  {app_.journal("gRPCServer"),
138  app_,
139  loadType,
140  app_.getOPs(),
142  usage,
143  role,
144  coro,
146  apiVersion},
147  request_};
148 
149  // Make sure we can currently handle the rpc
150  error_code_i conditionMetRes =
151  RPC::conditionMet(requiredCondition_, context);
152 
153  if (conditionMetRes != rpcSUCCESS)
154  {
155  RPC::ErrorInfo errorInfo = RPC::get_error_info(conditionMetRes);
156  grpc::Status status{
157  grpc::StatusCode::FAILED_PRECONDITION,
158  errorInfo.message.c_str()};
159  responder_.FinishWithError(status, this);
160  }
161  else
162  {
163  std::pair<Response, grpc::Status> result = handler_(context);
164  responder_.Finish(result.first, result.second, this);
165  }
166  }
167  }
168  catch (std::exception const& ex)
169  {
170  grpc::Status status{grpc::StatusCode::INTERNAL, ex.what()};
171  responder_.FinishWithError(status, this);
172  }
173 }
174 
175 template <class Request, class Response>
176 bool
178 {
179  return finished_;
180 }
181 
182 template <class Request, class Response>
185 {
186  return loadType_;
187 }
188 
189 template <class Request, class Response>
190 Role
192 {
193  return Role::USER;
194 }
195 
196 template <class Request, class Response>
199 {
200  std::string peer = getEndpoint(ctx_.peer());
201  boost::optional<beast::IP::Endpoint> endpoint =
203  return app_.getResourceManager().newInboundEndpoint(endpoint.get());
204 }
205 
207  : app_(app), journal_(app_.journal("gRPC Server"))
208 {
209  // if present, get endpoint from config
210  if (app_.config().exists("port_grpc"))
211  {
212  Section section = app_.config().section("port_grpc");
213 
214  std::pair<std::string, bool> ipPair = section.find("ip");
215  if (!ipPair.second)
216  return;
217 
218  std::pair<std::string, bool> portPair = section.find("port");
219  if (!portPair.second)
220  return;
221  try
222  {
223  beast::IP::Endpoint endpoint(
224  boost::asio::ip::make_address(ipPair.first),
225  std::stoi(portPair.first));
226 
227  serverAddress_ = endpoint.to_string();
228  }
229  catch (std::exception const&)
230  {
231  }
232  }
233 }
234 
235 void
237 {
238  JLOG(journal_.debug()) << "Shutting down";
239 
240  // The below call cancels all "listeners" (CallData objects that are waiting
241  // for a request, as opposed to processing a request), and blocks until all
242  // requests being processed are completed. CallData objects in the midst of
243  // processing requests need to actually send data back to the client, via
244  // responder_.Finish(...) or responder_.FinishWithError(...), for this call
245  // to unblock. Each cancelled listener is returned via cq_.Next(...) with ok
246  // set to false
247  server_->Shutdown();
248  JLOG(journal_.debug()) << "Server has been shutdown";
249 
250  // Always shutdown the completion queue after the server. This call allows
251  // cq_.Next() to return false, once all events posted to the completion
252  // queue have been processed. See handleRpcs() for more details.
253  cq_->Shutdown();
254  JLOG(journal_.debug()) << "Completion Queue has been shutdown";
255 }
256 
257 void
259 {
260  // This collection should really be an unordered_set. However, to delete
261  // from the unordered_set, we need a shared_ptr, but cq_.Next() (see below
262  // while loop) sets the tag to a raw pointer.
264 
265  auto erase = [&requests](Processor* ptr) {
266  auto it = std::find_if(
267  requests.begin(),
268  requests.end(),
269  [ptr](std::shared_ptr<Processor>& sPtr) {
270  return sPtr.get() == ptr;
271  });
272  BOOST_ASSERT(it != requests.end());
273  it->swap(requests.back());
274  requests.pop_back();
275  };
276 
277  void* tag; // uniquely identifies a request.
278  bool ok;
279  // Block waiting to read the next event from the completion queue. The
280  // event is uniquely identified by its tag, which in this case is the
281  // memory address of a CallData instance.
282  // The return value of Next should always be checked. This return value
283  // tells us whether there is any kind of event or cq_ is shutting down.
284  // When cq_.Next(...) returns false, all work has been completed and the
285  // loop can exit. When the server is shutdown, each CallData object that is
286  // listening for a request is forceably cancelled, and is returned by
287  // cq_->Next() with ok set to false. Then, each CallData object processing
288  // a request must complete (by sending data to the client), each of which
289  // will be returned from cq_->Next() with ok set to true. After all
290  // cancelled listeners and all CallData objects processing requests are
291  // returned via cq_->Next(), cq_->Next() will return false, causing the
292  // loop to exit.
293  while (cq_->Next(&tag, &ok))
294  {
295  auto ptr = static_cast<Processor*>(tag);
296  JLOG(journal_.trace()) << "Processing CallData object."
297  << " ptr = " << ptr << " ok = " << ok;
298 
299  if (!ok)
300  {
301  JLOG(journal_.debug()) << "Request listener cancelled. "
302  << "Destroying object";
303  erase(ptr);
304  }
305  else
306  {
307  if (!ptr->isFinished())
308  {
309  JLOG(journal_.debug()) << "Received new request. Processing";
310  // ptr is now processing a request, so create a new CallData
311  // object to handle additional requests
312  auto cloned = ptr->clone();
313  requests.push_back(cloned);
314  // process the request
315  ptr->process();
316  }
317  else
318  {
319  JLOG(journal_.debug()) << "Sent response. Destroying object";
320  erase(ptr);
321  }
322  }
323  }
324  JLOG(journal_.debug()) << "Completion Queue drained";
325 }
326 
327 // create a CallData instance for each RPC
330 {
332 
333  auto addToRequests = [&requests](auto callData) {
334  requests.push_back(std::move(callData));
335  };
336 
337  {
338  using cd = CallData<
339  org::xrpl::rpc::v1::GetFeeRequest,
340  org::xrpl::rpc::v1::GetFeeResponse>;
341 
342  addToRequests(std::make_shared<cd>(
343  service_,
344  *cq_,
345  app_,
346  &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
347  RequestGetFee,
348  doFeeGrpc,
351  }
352  {
353  using cd = CallData<
354  org::xrpl::rpc::v1::GetAccountInfoRequest,
355  org::xrpl::rpc::v1::GetAccountInfoResponse>;
356 
357  addToRequests(std::make_shared<cd>(
358  service_,
359  *cq_,
360  app_,
361  &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
362  RequestGetAccountInfo,
366  }
367  {
368  using cd = CallData<
369  org::xrpl::rpc::v1::GetTransactionRequest,
370  org::xrpl::rpc::v1::GetTransactionResponse>;
371 
372  addToRequests(std::make_shared<cd>(
373  service_,
374  *cq_,
375  app_,
376  &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
377  RequestGetTransaction,
378  doTxGrpc,
381  }
382  {
383  using cd = CallData<
384  org::xrpl::rpc::v1::SubmitTransactionRequest,
385  org::xrpl::rpc::v1::SubmitTransactionResponse>;
386 
387  addToRequests(std::make_shared<cd>(
388  service_,
389  *cq_,
390  app_,
391  &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
392  RequestSubmitTransaction,
393  doSubmitGrpc,
396  }
397 
398  {
399  using cd = CallData<
400  org::xrpl::rpc::v1::GetAccountTransactionHistoryRequest,
401  org::xrpl::rpc::v1::GetAccountTransactionHistoryResponse>;
402 
403  addToRequests(std::make_shared<cd>(
404  service_,
405  *cq_,
406  app_,
407  &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
408  RequestGetAccountTransactionHistory,
412  }
413  return requests;
414 };
415 
416 bool
418 {
419  // if config does not specify a grpc server address, don't start
420  if (serverAddress_.empty())
421  return false;
422 
423  JLOG(journal_.info()) << "Starting gRPC server at " << serverAddress_;
424 
425  grpc::ServerBuilder builder;
426  // Listen on the given address without any authentication mechanism.
427  builder.AddListeningPort(serverAddress_, grpc::InsecureServerCredentials());
428  // Register "service_" as the instance through which we'll communicate with
429  // clients. In this case it corresponds to an *asynchronous* service.
430  builder.RegisterService(&service_);
431  // Get hold of the completion queue used for the asynchronous communication
432  // with the gRPC runtime.
433  cq_ = builder.AddCompletionQueue();
434  // Finally assemble the server.
435  server_ = builder.BuildAndStart();
436 
437  return true;
438 }
439 
440 void
442 {
443  // Start the server and setup listeners
444  if ((running_ = impl_.start()))
445  {
446  thread_ = std::thread([this]() {
447  // Start the event loop and begin handling requests
448  this->impl_.handleRpcs();
449  });
450  }
451 }
452 
454 {
455  if (running_)
456  {
457  impl_.shutdown();
458  thread_.join();
459  }
460 }
461 
462 } // namespace ripple
ripple::Resource::Manager::newInboundEndpoint
virtual Consumer newInboundEndpoint(beast::IP::Endpoint const &address)=0
Create a new endpoint keyed by inbound IP address or the forwarded IP if proxied.
ripple::Section
Holds a collection of configuration values.
Definition: BasicConfig.h:43
ripple::Application
Definition: Application.h:97
ripple::Processor
Definition: GRPCServer.h:41
std::string
STL class.
std::shared_ptr
STL class.
ripple::JobQueue::postCoro
std::shared_ptr< Coro > postCoro(JobType t, std::string const &name, F &&f)
Creates a coroutine and adds a job to the queue which will run it.
Definition: JobQueue.h:427
ripple::GRPCServer::impl_
GRPCServerImpl impl_
Definition: GRPCServer.h:253
std::exception
STL class.
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:309
ripple::GRPCServerImpl::CallData::clone
std::shared_ptr< Processor > clone() override
Definition: GRPCServer.cpp:68
ripple::RPC::get_error_info
ErrorInfo const & get_error_info(error_code_i code)
Returns an ErrorInfo that reflects the error code.
Definition: ErrorCodes.cpp:195
ripple::Resource::feeMediumBurdenRPC
const Charge feeMediumBurdenRPC
beast::IP::Endpoint::to_string
std::string to_string() const
Returns a string representing the endpoint.
Definition: IPEndpoint.cpp:54
std::pair
ripple::GRPCServerImpl::apiVersion
static constexpr unsigned apiVersion
Definition: GRPCServer.h:109
ripple::GRPCServerImpl::start
bool start()
Definition: GRPCServer.cpp:417
std::vector
STL class.
std::find_if
T find_if(T... args)
ripple::GRPCServerImpl::shutdown
void shutdown()
Definition: GRPCServer.cpp:236
ripple::GRPCServerImpl::CallData::request_
Request request_
Definition: GRPCServer.h:165
ripple::GRPCServerImpl::service_
org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService service_
Definition: GRPCServer.h:80
ripple::doAccountTxGrpc
std::pair< org::xrpl::rpc::v1::GetAccountTransactionHistoryResponse, grpc::Status > doAccountTxGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetAccountTransactionHistoryRequest > &context)
Definition: AccountTx.cpp:581
ripple::GRPCServerImpl::CallData::cq_
grpc::ServerCompletionQueue & cq_
Definition: GRPCServer.h:148
ripple::GRPCServer::running_
bool running_
Definition: GRPCServer.h:255
ripple::GRPCServerImpl::server_
std::unique_ptr< grpc::Server > server_
Definition: GRPCServer.h:82
ripple::Resource::feeReferenceRPC
const Charge feeReferenceRPC
std::vector::back
T back(T... args)
std::function
ripple::GRPCServerImpl::handleRpcs
void handleRpcs()
Definition: GRPCServer.cpp:258
Json::StaticString::c_str
constexpr const char * c_str() const
Definition: json_value.h:73
ripple::GRPCServerImpl::CallData::isFinished
virtual bool isFinished() override
Definition: GRPCServer.cpp:177
ripple::Application::getOPs
virtual NetworkOPs & getOPs()=0
ripple::error_code_i
error_code_i
Definition: ErrorCodes.h:40
std::vector::push_back
T push_back(T... args)
ripple::RPC::ErrorInfo::message
Json::StaticString message
Definition: ErrorCodes.h:175
ripple::erase
void erase(STObject &st, TypedField< U > const &f)
Remove a field in an STObject.
Definition: STExchange.h:171
ripple::GRPCServer::thread_
std::thread thread_
Definition: GRPCServer.h:254
ripple::GRPCServerImpl::CallData::service_
org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService & service_
Definition: GRPCServer.h:145
std::stoi
T stoi(T... args)
ripple::rpcSUCCESS
@ rpcSUCCESS
Definition: ErrorCodes.h:44
ripple::GRPCServerImpl::cq_
std::unique_ptr< grpc::ServerCompletionQueue > cq_
Definition: GRPCServer.h:75
ripple::GRPCServerImpl::CallData::getUsage
Resource::Consumer getUsage()
Definition: GRPCServer.cpp:198
std::thread
STL class.
ripple::InfoSub::pointer
std::shared_ptr< InfoSub > pointer
Definition: InfoSub.h:49
ripple::Application::getLedgerMaster
virtual LedgerMaster & getLedgerMaster()=0
ripple::Role::USER
@ USER
ripple::RPC::NEEDS_CURRENT_LEDGER
@ NEEDS_CURRENT_LEDGER
Definition: Handler.h:42
ripple::Application::config
virtual Config & config()=0
ripple::RPC::GRPCContext
Definition: Context.h:70
std::string::find_last_of
T find_last_of(T... args)
ripple::Application::getJobQueue
virtual JobQueue & getJobQueue()=0
ripple::doTxGrpc
std::pair< org::xrpl::rpc::v1::GetTransactionResponse, grpc::Status > doTxGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetTransactionRequest > &context)
Definition: Tx.cpp:389
ripple::RPC::NO_CONDITION
@ NO_CONDITION
Definition: Handler.h:40
beast::Journal::info
Stream info() const
Definition: Journal.h:321
ripple::GRPCServerImpl::GRPCServerImpl
GRPCServerImpl(Application &app)
Definition: GRPCServer.cpp:206
ripple::RPC::Condition
Condition
Definition: Handler.h:39
ripple::RPC::ErrorInfo
Maps an rpc error code to its token and default message.
Definition: ErrorCodes.h:157
ripple::GRPCServerImpl::CallData::getLoadType
Resource::Charge getLoadType()
Definition: GRPCServer.cpp:184
std::vector::pop_back
T pop_back(T... args)
ripple::Section::find
std::pair< std::string, bool > find(std::string const &name) const
Retrieve a key/value pair.
Definition: BasicConfig.cpp:113
ripple::GRPCServerImpl::CallData::bindListener_
BindListener< Request, Response > bindListener_
Definition: GRPCServer.h:174
ripple::Application::getResourceManager
virtual Resource::Manager & getResourceManager()=0
std::string::substr
T substr(T... args)
ripple::GRPCServer::~GRPCServer
~GRPCServer()
Definition: GRPCServer.cpp:453
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::GRPCServerImpl::setupListeners
std::vector< std::shared_ptr< Processor > > setupListeners()
Definition: GRPCServer.cpp:329
ripple::Application::journal
virtual beast::Journal journal(std::string const &name)=0
ripple::GRPCServerImpl::CallData::ctx_
grpc::ServerContext ctx_
Definition: GRPCServer.h:153
ripple::RPC::conditionMet
error_code_i conditionMet(Condition condition_required, T &context)
Definition: Handler.h:78
std::vector::begin
T begin(T... args)
std
STL namespace.
ripple::Resource::Consumer
An endpoint that consumes resources.
Definition: Consumer.h:33
ripple::Resource::Charge
A consumption charge.
Definition: Charge.h:30
ripple::GRPCServerImpl::CallData
Definition: GRPCServer.h:138
std::string::empty
T empty(T... args)
ripple::GRPCServer::run
void run()
Definition: GRPCServer.cpp:441
std::string::find_first_of
T find_first_of(T... args)
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
std::size_t
ripple::doFeeGrpc
std::pair< org::xrpl::rpc::v1::GetFeeResponse, grpc::Status > doFeeGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetFeeRequest > &context)
Definition: Fee1.cpp:42
beast::IP::Endpoint
A version-independent IP address and port combination.
Definition: IPEndpoint.h:39
ripple::GRPCServerImpl::app_
Application & app_
Definition: GRPCServer.h:84
std::vector::end
T end(T... args)
beast::IP::Endpoint::from_string_checked
static boost::optional< Endpoint > from_string_checked(std::string const &s)
Create an Endpoint from a string.
Definition: IPEndpoint.cpp:35
ripple::GRPCServerImpl::CallData::getRole
Role getRole()
Definition: GRPCServer.cpp:191
ripple::GRPCServerImpl::journal_
beast::Journal journal_
Definition: GRPCServer.h:88
ripple::Role
Role
Indicates the level of administrative permission to grant.
Definition: Role.h:40
ripple::GRPCServerImpl::CallData::CallData
CallData(org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService &service, grpc::ServerCompletionQueue &cq, Application &app, BindListener< Request, Response > bindListener, Handler< Request, Response > handler, RPC::Condition requiredCondition, Resource::Charge loadType)
Definition: GRPCServer.cpp:43
ripple::GRPCServerImpl::CallData::process
virtual void process() override
Definition: GRPCServer.cpp:82
ripple::GRPCServerImpl::serverAddress_
std::string serverAddress_
Definition: GRPCServer.h:86
std::thread::join
T join(T... args)
std::exception::what
T what(T... args)
ripple::doAccountInfoGrpc
std::pair< org::xrpl::rpc::v1::GetAccountInfoResponse, grpc::Status > doAccountInfoGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetAccountInfoRequest > &context)
Definition: AccountInfo.cpp:189
ripple::BasicConfig::exists
bool exists(std::string const &name) const
Returns true if a section with the given name exists.
Definition: BasicConfig.cpp:132
ripple::GRPCServerImpl::CallData::responder_
grpc::ServerAsyncResponseWriter< Response > responder_
Definition: GRPCServer.h:171
ripple::BasicConfig::section
Section & section(std::string const &name)
Returns the section with the given name.
Definition: BasicConfig.cpp:138
ripple::doSubmitGrpc
std::pair< org::xrpl::rpc::v1::SubmitTransactionResponse, grpc::Status > doSubmitGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::SubmitTransactionRequest > &context)
Definition: Submit.cpp:198