rippled
GRPCServer.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2020 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/app/main/GRPCServer.h>
21 #include <ripple/beast/core/CurrentThreadName.h>
22 #include <ripple/resource/Fees.h>
23 
24 namespace ripple {
25 
26 namespace {
27 
28 // helper function. strips scheme from endpoint string
30 getEndpoint(std::string const& peer)
31 {
32  std::size_t first = peer.find_first_of(":");
33  std::size_t last = peer.find_last_of(":");
34  std::string peerClean(peer);
35  if (first != last)
36  {
37  peerClean = peer.substr(first + 1);
38  }
39  return peerClean;
40 }
41 } // namespace
42 
43 template <class Request, class Response>
45  org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService& service,
46  grpc::ServerCompletionQueue& cq,
47  Application& app,
50  RPC::Condition requiredCondition,
51  Resource::Charge loadType)
52  : service_(service)
53  , cq_(cq)
54  , finished_(false)
55  , app_(app)
56  , responder_(&ctx_)
57  , bindListener_(std::move(bindListener))
58  , handler_(std::move(handler))
59  , requiredCondition_(std::move(requiredCondition))
60  , loadType_(std::move(loadType))
61 {
62  // Bind a listener. When a request is received, "this" will be returned
63  // from CompletionQueue::Next
65 }
66 
67 template <class Request, class Response>
70 {
71  return std::make_shared<CallData<Request, Response>>(
72  service_,
73  cq_,
74  app_,
75  bindListener_,
76  handler_,
77  requiredCondition_,
78  loadType_);
79 }
80 
81 template <class Request, class Response>
82 void
84 {
85  // sanity check
86  BOOST_ASSERT(!finished_);
87 
89  this->shared_from_this();
90 
91  // Need to set finished to true before processing the response,
92  // because as soon as the response is posted to the completion
93  // queue (via responder_.Finish(...) or responder_.FinishWithError(...)),
94  // the CallData object is returned as a tag in handleRpcs().
95  // handleRpcs() checks the finished variable, and if true, destroys
96  // the object. Setting finished to true before calling process
97  // ensures that finished is always true when this CallData object
98  // is returned as a tag in handleRpcs(), after sending the response
99  finished_ = true;
100  auto coro = app_.getJobQueue().postCoro(
101  JobType::jtRPC,
102  "gRPC-Client",
103  [thisShared](std::shared_ptr<JobQueue::Coro> coro) {
104  thisShared->process(coro);
105  });
106 
107  // If coro is null, then the JobQueue has already been shutdown
108  if (!coro)
109  {
110  grpc::Status status{
111  grpc::StatusCode::INTERNAL, "Job Queue is already stopped"};
112  responder_.FinishWithError(status, this);
113  }
114 }
115 
116 template <class Request, class Response>
117 void
120 {
121  try
122  {
123  auto usage = getUsage();
124  if (usage.disconnect())
125  {
126  grpc::Status status{
127  grpc::StatusCode::RESOURCE_EXHAUSTED,
128  "usage balance exceeds threshhold"};
129  responder_.FinishWithError(status, this);
130  }
131  else
132  {
133  auto loadType = getLoadType();
134  usage.charge(loadType);
135  auto role = getRole();
136 
138  {app_.journal("gRPCServer"),
139  app_,
140  loadType,
141  app_.getOPs(),
143  usage,
144  role,
145  coro,
147  apiVersion},
148  request_};
149 
150  // Make sure we can currently handle the rpc
151  error_code_i conditionMetRes =
152  RPC::conditionMet(requiredCondition_, context);
153 
154  if (conditionMetRes != rpcSUCCESS)
155  {
156  RPC::ErrorInfo errorInfo = RPC::get_error_info(conditionMetRes);
157  grpc::Status status{
158  grpc::StatusCode::FAILED_PRECONDITION,
159  errorInfo.message.c_str()};
160  responder_.FinishWithError(status, this);
161  }
162  else
163  {
164  std::pair<Response, grpc::Status> result = handler_(context);
165  responder_.Finish(result.first, result.second, this);
166  }
167  }
168  }
169  catch (std::exception const& ex)
170  {
171  grpc::Status status{grpc::StatusCode::INTERNAL, ex.what()};
172  responder_.FinishWithError(status, this);
173  }
174 }
175 
176 template <class Request, class Response>
177 bool
179 {
180  return finished_;
181 }
182 
183 template <class Request, class Response>
186 {
187  return loadType_;
188 }
189 
190 template <class Request, class Response>
191 Role
193 {
194  return Role::USER;
195 }
196 
197 template <class Request, class Response>
200 {
201  std::string peer = getEndpoint(ctx_.peer());
202  boost::optional<beast::IP::Endpoint> endpoint =
204  return app_.getResourceManager().newInboundEndpoint(endpoint.get());
205 }
206 
208  : app_(app), journal_(app_.journal("gRPC Server"))
209 {
210  // if present, get endpoint from config
211  if (app_.config().exists("port_grpc"))
212  {
213  Section section = app_.config().section("port_grpc");
214 
215  std::pair<std::string, bool> ipPair = section.find("ip");
216  if (!ipPair.second)
217  return;
218 
219  std::pair<std::string, bool> portPair = section.find("port");
220  if (!portPair.second)
221  return;
222  try
223  {
224  beast::IP::Endpoint endpoint(
225  boost::asio::ip::make_address(ipPair.first),
226  std::stoi(portPair.first));
227 
228  serverAddress_ = endpoint.to_string();
229  }
230  catch (std::exception const&)
231  {
232  }
233  }
234 }
235 
236 void
238 {
239  JLOG(journal_.debug()) << "Shutting down";
240 
241  // The below call cancels all "listeners" (CallData objects that are waiting
242  // for a request, as opposed to processing a request), and blocks until all
243  // requests being processed are completed. CallData objects in the midst of
244  // processing requests need to actually send data back to the client, via
245  // responder_.Finish(...) or responder_.FinishWithError(...), for this call
246  // to unblock. Each cancelled listener is returned via cq_.Next(...) with ok
247  // set to false
248  server_->Shutdown();
249  JLOG(journal_.debug()) << "Server has been shutdown";
250 
251  // Always shutdown the completion queue after the server. This call allows
252  // cq_.Next() to return false, once all events posted to the completion
253  // queue have been processed. See handleRpcs() for more details.
254  cq_->Shutdown();
255  JLOG(journal_.debug()) << "Completion Queue has been shutdown";
256 }
257 
258 void
260 {
261  // This collection should really be an unordered_set. However, to delete
262  // from the unordered_set, we need a shared_ptr, but cq_.Next() (see below
263  // while loop) sets the tag to a raw pointer.
265 
266  auto erase = [&requests](Processor* ptr) {
267  auto it = std::find_if(
268  requests.begin(),
269  requests.end(),
270  [ptr](std::shared_ptr<Processor>& sPtr) {
271  return sPtr.get() == ptr;
272  });
273  BOOST_ASSERT(it != requests.end());
274  it->swap(requests.back());
275  requests.pop_back();
276  };
277 
278  void* tag; // uniquely identifies a request.
279  bool ok;
280  // Block waiting to read the next event from the completion queue. The
281  // event is uniquely identified by its tag, which in this case is the
282  // memory address of a CallData instance.
283  // The return value of Next should always be checked. This return value
284  // tells us whether there is any kind of event or cq_ is shutting down.
285  // When cq_.Next(...) returns false, all work has been completed and the
286  // loop can exit. When the server is shutdown, each CallData object that is
287  // listening for a request is forceably cancelled, and is returned by
288  // cq_->Next() with ok set to false. Then, each CallData object processing
289  // a request must complete (by sending data to the client), each of which
290  // will be returned from cq_->Next() with ok set to true. After all
291  // cancelled listeners and all CallData objects processing requests are
292  // returned via cq_->Next(), cq_->Next() will return false, causing the
293  // loop to exit.
294  while (cq_->Next(&tag, &ok))
295  {
296  auto ptr = static_cast<Processor*>(tag);
297  JLOG(journal_.trace()) << "Processing CallData object."
298  << " ptr = " << ptr << " ok = " << ok;
299 
300  if (!ok)
301  {
302  JLOG(journal_.debug()) << "Request listener cancelled. "
303  << "Destroying object";
304  erase(ptr);
305  }
306  else
307  {
308  if (!ptr->isFinished())
309  {
310  JLOG(journal_.debug()) << "Received new request. Processing";
311  // ptr is now processing a request, so create a new CallData
312  // object to handle additional requests
313  auto cloned = ptr->clone();
314  requests.push_back(cloned);
315  // process the request
316  ptr->process();
317  }
318  else
319  {
320  JLOG(journal_.debug()) << "Sent response. Destroying object";
321  erase(ptr);
322  }
323  }
324  }
325  JLOG(journal_.debug()) << "Completion Queue drained";
326 }
327 
328 // create a CallData instance for each RPC
331 {
333 
334  auto addToRequests = [&requests](auto callData) {
335  requests.push_back(std::move(callData));
336  };
337 
338  {
339  using cd = CallData<
340  org::xrpl::rpc::v1::GetFeeRequest,
341  org::xrpl::rpc::v1::GetFeeResponse>;
342 
343  addToRequests(std::make_shared<cd>(
344  service_,
345  *cq_,
346  app_,
347  &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
348  RequestGetFee,
349  doFeeGrpc,
352  }
353  {
354  using cd = CallData<
355  org::xrpl::rpc::v1::GetAccountInfoRequest,
356  org::xrpl::rpc::v1::GetAccountInfoResponse>;
357 
358  addToRequests(std::make_shared<cd>(
359  service_,
360  *cq_,
361  app_,
362  &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
363  RequestGetAccountInfo,
367  }
368  {
369  using cd = CallData<
370  org::xrpl::rpc::v1::GetTransactionRequest,
371  org::xrpl::rpc::v1::GetTransactionResponse>;
372 
373  addToRequests(std::make_shared<cd>(
374  service_,
375  *cq_,
376  app_,
377  &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
378  RequestGetTransaction,
379  doTxGrpc,
382  }
383  {
384  using cd = CallData<
385  org::xrpl::rpc::v1::SubmitTransactionRequest,
386  org::xrpl::rpc::v1::SubmitTransactionResponse>;
387 
388  addToRequests(std::make_shared<cd>(
389  service_,
390  *cq_,
391  app_,
392  &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
393  RequestSubmitTransaction,
394  doSubmitGrpc,
397  }
398 
399  {
400  using cd = CallData<
401  org::xrpl::rpc::v1::GetAccountTransactionHistoryRequest,
402  org::xrpl::rpc::v1::GetAccountTransactionHistoryResponse>;
403 
404  addToRequests(std::make_shared<cd>(
405  service_,
406  *cq_,
407  app_,
408  &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
409  RequestGetAccountTransactionHistory,
413  }
414  return requests;
415 };
416 
417 bool
419 {
420  // if config does not specify a grpc server address, don't start
421  if (serverAddress_.empty())
422  return false;
423 
424  JLOG(journal_.info()) << "Starting gRPC server at " << serverAddress_;
425 
426  grpc::ServerBuilder builder;
427  // Listen on the given address without any authentication mechanism.
428  builder.AddListeningPort(serverAddress_, grpc::InsecureServerCredentials());
429  // Register "service_" as the instance through which we'll communicate with
430  // clients. In this case it corresponds to an *asynchronous* service.
431  builder.RegisterService(&service_);
432  // Get hold of the completion queue used for the asynchronous communication
433  // with the gRPC runtime.
434  cq_ = builder.AddCompletionQueue();
435  // Finally assemble the server.
436  server_ = builder.BuildAndStart();
437 
438  return true;
439 }
440 
441 void
443 {
444  // Start the server and setup listeners
445  if (running_ = impl_.start(); running_)
446  {
447  thread_ = std::thread([this]() {
448  // Start the event loop and begin handling requests
449  beast::setCurrentThreadName("rippled: grpc");
450  this->impl_.handleRpcs();
451  });
452  }
453 }
454 
455 void
457 {
458  if (running_)
459  {
460  impl_.shutdown();
461  thread_.join();
462  running_ = false;
463  }
464 
465  stopped();
466 }
467 
469 {
470  assert(!running_);
471 }
472 
473 } // namespace ripple
ripple::Resource::Manager::newInboundEndpoint
virtual Consumer newInboundEndpoint(beast::IP::Endpoint const &address)=0
Create a new endpoint keyed by inbound IP address or the forwarded IP if proxied.
ripple::Section
Holds a collection of configuration values.
Definition: BasicConfig.h:43
ripple::GRPCServer::onStart
void onStart() override
Override called during start.
Definition: GRPCServer.cpp:442
ripple::Application
Definition: Application.h:97
ripple::Processor
Definition: GRPCServer.h:42
std::string
STL class.
std::shared_ptr
STL class.
ripple::JobQueue::postCoro
std::shared_ptr< Coro > postCoro(JobType t, std::string const &name, F &&f)
Creates a coroutine and adds a job to the queue which will run it.
Definition: JobQueue.h:427
ripple::GRPCServer::impl_
GRPCServerImpl impl_
Definition: GRPCServer.h:260
std::exception
STL class.
ripple::Stoppable::stopped
void stopped()
Called by derived classes to indicate that the stoppable has stopped.
Definition: Stoppable.cpp:72
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:309
ripple::GRPCServerImpl::CallData::clone
std::shared_ptr< Processor > clone() override
Definition: GRPCServer.cpp:69
ripple::RPC::get_error_info
ErrorInfo const & get_error_info(error_code_i code)
Returns an ErrorInfo that reflects the error code.
Definition: ErrorCodes.cpp:195
ripple::Resource::feeMediumBurdenRPC
const Charge feeMediumBurdenRPC
beast::IP::Endpoint::to_string
std::string to_string() const
Returns a string representing the endpoint.
Definition: IPEndpoint.cpp:54
std::pair
ripple::GRPCServerImpl::apiVersion
static constexpr unsigned apiVersion
Definition: GRPCServer.h:110
ripple::GRPCServerImpl::start
bool start()
Definition: GRPCServer.cpp:418
std::vector
STL class.
std::find_if
T find_if(T... args)
ripple::GRPCServerImpl::shutdown
void shutdown()
Definition: GRPCServer.cpp:237
ripple::GRPCServerImpl::CallData::request_
Request request_
Definition: GRPCServer.h:166
ripple::GRPCServerImpl::service_
org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService service_
Definition: GRPCServer.h:81
ripple::doAccountTxGrpc
std::pair< org::xrpl::rpc::v1::GetAccountTransactionHistoryResponse, grpc::Status > doAccountTxGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetAccountTransactionHistoryRequest > &context)
Definition: AccountTx.cpp:581
ripple::GRPCServerImpl::CallData::cq_
grpc::ServerCompletionQueue & cq_
Definition: GRPCServer.h:149
ripple::GRPCServer::running_
bool running_
Definition: GRPCServer.h:262
ripple::GRPCServerImpl::server_
std::unique_ptr< grpc::Server > server_
Definition: GRPCServer.h:83
ripple::Resource::feeReferenceRPC
const Charge feeReferenceRPC
std::vector::back
T back(T... args)
std::function
ripple::GRPCServerImpl::handleRpcs
void handleRpcs()
Definition: GRPCServer.cpp:259
Json::StaticString::c_str
constexpr const char * c_str() const
Definition: json_value.h:73
ripple::GRPCServerImpl::CallData::isFinished
virtual bool isFinished() override
Definition: GRPCServer.cpp:178
ripple::Application::getOPs
virtual NetworkOPs & getOPs()=0
ripple::error_code_i
error_code_i
Definition: ErrorCodes.h:40
std::vector::push_back
T push_back(T... args)
ripple::RPC::ErrorInfo::message
Json::StaticString message
Definition: ErrorCodes.h:177
ripple::erase
void erase(STObject &st, TypedField< U > const &f)
Remove a field in an STObject.
Definition: STExchange.h:171
ripple::GRPCServer::thread_
std::thread thread_
Definition: GRPCServer.h:261
ripple::GRPCServerImpl::CallData::service_
org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService & service_
Definition: GRPCServer.h:146
std::stoi
T stoi(T... args)
ripple::rpcSUCCESS
@ rpcSUCCESS
Definition: ErrorCodes.h:44
ripple::GRPCServerImpl::cq_
std::unique_ptr< grpc::ServerCompletionQueue > cq_
Definition: GRPCServer.h:76
ripple::GRPCServerImpl::CallData::getUsage
Resource::Consumer getUsage()
Definition: GRPCServer.cpp:199
std::thread
STL class.
ripple::InfoSub::pointer
std::shared_ptr< InfoSub > pointer
Definition: InfoSub.h:43
ripple::Application::getLedgerMaster
virtual LedgerMaster & getLedgerMaster()=0
ripple::Role::USER
@ USER
ripple::RPC::NEEDS_CURRENT_LEDGER
@ NEEDS_CURRENT_LEDGER
Definition: Handler.h:42
ripple::Application::config
virtual Config & config()=0
ripple::RPC::GRPCContext
Definition: Context.h:70
std::string::find_last_of
T find_last_of(T... args)
ripple::Application::getJobQueue
virtual JobQueue & getJobQueue()=0
ripple::doTxGrpc
std::pair< org::xrpl::rpc::v1::GetTransactionResponse, grpc::Status > doTxGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetTransactionRequest > &context)
Definition: Tx.cpp:326
ripple::RPC::NO_CONDITION
@ NO_CONDITION
Definition: Handler.h:40
beast::Journal::info
Stream info() const
Definition: Journal.h:321
ripple::GRPCServerImpl::GRPCServerImpl
GRPCServerImpl(Application &app)
Definition: GRPCServer.cpp:207
ripple::RPC::Condition
Condition
Definition: Handler.h:39
ripple::RPC::ErrorInfo
Maps an rpc error code to its token and default message.
Definition: ErrorCodes.h:159
ripple::GRPCServerImpl::CallData::getLoadType
Resource::Charge getLoadType()
Definition: GRPCServer.cpp:185
std::vector::pop_back
T pop_back(T... args)
ripple::Section::find
std::pair< std::string, bool > find(std::string const &name) const
Retrieve a key/value pair.
Definition: BasicConfig.cpp:113
ripple::GRPCServerImpl::CallData::bindListener_
BindListener< Request, Response > bindListener_
Definition: GRPCServer.h:175
ripple::Application::getResourceManager
virtual Resource::Manager & getResourceManager()=0
std::string::substr
T substr(T... args)
beast::setCurrentThreadName
void setCurrentThreadName(std::string_view name)
Changes the name of the caller thread.
Definition: CurrentThreadName.cpp:119
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::GRPCServerImpl::setupListeners
std::vector< std::shared_ptr< Processor > > setupListeners()
Definition: GRPCServer.cpp:330
ripple::Application::journal
virtual beast::Journal journal(std::string const &name)=0
ripple::GRPCServerImpl::CallData::ctx_
grpc::ServerContext ctx_
Definition: GRPCServer.h:154
ripple::RPC::conditionMet
error_code_i conditionMet(Condition condition_required, T &context)
Definition: Handler.h:78
std::vector::begin
T begin(T... args)
std
STL namespace.
ripple::Resource::Consumer
An endpoint that consumes resources.
Definition: Consumer.h:33
ripple::Resource::Charge
A consumption charge.
Definition: Charge.h:30
ripple::GRPCServerImpl::CallData
Definition: GRPCServer.h:139
std::string::empty
T empty(T... args)
std::string::find_first_of
T find_first_of(T... args)
ripple::GRPCServer::onStop
void onStop() override
Override called when the stop notification is issued.
Definition: GRPCServer.cpp:456
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
std::size_t
ripple::doFeeGrpc
std::pair< org::xrpl::rpc::v1::GetFeeResponse, grpc::Status > doFeeGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetFeeRequest > &context)
Definition: Fee1.cpp:42
beast::IP::Endpoint
A version-independent IP address and port combination.
Definition: IPEndpoint.h:39
ripple::GRPCServerImpl::app_
Application & app_
Definition: GRPCServer.h:85
std::vector::end
T end(T... args)
beast::IP::Endpoint::from_string_checked
static boost::optional< Endpoint > from_string_checked(std::string const &s)
Create an Endpoint from a string.
Definition: IPEndpoint.cpp:35
ripple::GRPCServerImpl::CallData::getRole
Role getRole()
Definition: GRPCServer.cpp:192
ripple::GRPCServerImpl::journal_
beast::Journal journal_
Definition: GRPCServer.h:89
ripple::GRPCServer::~GRPCServer
~GRPCServer() override
Definition: GRPCServer.cpp:468
ripple::Role
Role
Indicates the level of administrative permission to grant.
Definition: Role.h:40
ripple::GRPCServerImpl::CallData::CallData
CallData(org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService &service, grpc::ServerCompletionQueue &cq, Application &app, BindListener< Request, Response > bindListener, Handler< Request, Response > handler, RPC::Condition requiredCondition, Resource::Charge loadType)
Definition: GRPCServer.cpp:44
ripple::GRPCServerImpl::CallData::process
virtual void process() override
Definition: GRPCServer.cpp:83
ripple::GRPCServerImpl::serverAddress_
std::string serverAddress_
Definition: GRPCServer.h:87
std::thread::join
T join(T... args)
std::exception::what
T what(T... args)
ripple::doAccountInfoGrpc
std::pair< org::xrpl::rpc::v1::GetAccountInfoResponse, grpc::Status > doAccountInfoGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetAccountInfoRequest > &context)
Definition: AccountInfo.cpp:216
ripple::BasicConfig::exists
bool exists(std::string const &name) const
Returns true if a section with the given name exists.
Definition: BasicConfig.cpp:132
ripple::GRPCServerImpl::CallData::responder_
grpc::ServerAsyncResponseWriter< Response > responder_
Definition: GRPCServer.h:172
ripple::BasicConfig::section
Section & section(std::string const &name)
Returns the section with the given name.
Definition: BasicConfig.cpp:138
ripple::doSubmitGrpc
std::pair< org::xrpl::rpc::v1::SubmitTransactionResponse, grpc::Status > doSubmitGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::SubmitTransactionRequest > &context)
Definition: Submit.cpp:198