rippled
GRPCServer.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2020 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/app/main/GRPCServer.h>
21 #include <ripple/resource/Fees.h>
22 
23 namespace ripple {
24 
25 namespace {
26 
27 // helper function. strips scheme from endpoint string
29 getEndpoint(std::string const& peer)
30 {
31  std::size_t first = peer.find_first_of(":");
32  std::size_t last = peer.find_last_of(":");
33  std::string peerClean(peer);
34  if (first != last)
35  {
36  peerClean = peer.substr(first + 1);
37  }
38  return peerClean;
39 }
40 } // namespace
41 
42 template <class Request, class Response>
44  org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService& service,
45  grpc::ServerCompletionQueue& cq,
46  Application& app,
49  RPC::Condition requiredCondition,
50  Resource::Charge loadType)
51  : service_(service)
52  , cq_(cq)
53  , finished_(false)
54  , app_(app)
55  , responder_(&ctx_)
56  , bindListener_(std::move(bindListener))
57  , handler_(std::move(handler))
58  , requiredCondition_(std::move(requiredCondition))
59  , loadType_(std::move(loadType))
60 {
61  // Bind a listener. When a request is received, "this" will be returned
62  // from CompletionQueue::Next
64 }
65 
66 template <class Request, class Response>
69 {
70  return std::make_shared<CallData<Request, Response>>(
71  service_,
72  cq_,
73  app_,
74  bindListener_,
75  handler_,
76  requiredCondition_,
77  loadType_);
78 }
79 
80 template <class Request, class Response>
81 void
83 {
84  // sanity check
85  BOOST_ASSERT(!finished_);
86 
88  this->shared_from_this();
89 
90  // Need to set finished to true before processing the response,
91  // because as soon as the response is posted to the completion
92  // queue (via responder_.Finish(...) or responder_.FinishWithError(...)),
93  // the CallData object is returned as a tag in handleRpcs().
94  // handleRpcs() checks the finished variable, and if true, destroys
95  // the object. Setting finished to true before calling process
96  // ensures that finished is always true when this CallData object
97  // is returned as a tag in handleRpcs(), after sending the response
98  finished_ = true;
99  auto coro = app_.getJobQueue().postCoro(
100  JobType::jtRPC,
101  "gRPC-Client",
102  [thisShared](std::shared_ptr<JobQueue::Coro> coro) {
103  thisShared->process(coro);
104  });
105 
106  // If coro is null, then the JobQueue has already been shutdown
107  if (!coro)
108  {
109  grpc::Status status{
110  grpc::StatusCode::INTERNAL, "Job Queue is already stopped"};
111  responder_.FinishWithError(status, this);
112  }
113 }
114 
115 template <class Request, class Response>
116 void
119 {
120  try
121  {
122  auto usage = getUsage();
123  if (usage.disconnect())
124  {
125  grpc::Status status{
126  grpc::StatusCode::RESOURCE_EXHAUSTED,
127  "usage balance exceeds threshhold"};
128  responder_.FinishWithError(status, this);
129  }
130  else
131  {
132  auto loadType = getLoadType();
133  usage.charge(loadType);
134  auto role = getRole();
135 
137  {app_.journal("gRPCServer"),
138  app_,
139  loadType,
140  app_.getOPs(),
142  usage,
143  role,
144  coro,
145  InfoSub::pointer()},
146  request_};
147 
148  // Make sure we can currently handle the rpc
149  error_code_i conditionMetRes =
150  RPC::conditionMet(requiredCondition_, context);
151 
152  if (conditionMetRes != rpcSUCCESS)
153  {
154  RPC::ErrorInfo errorInfo = RPC::get_error_info(conditionMetRes);
155  grpc::Status status{
156  grpc::StatusCode::FAILED_PRECONDITION,
157  errorInfo.message.c_str()};
158  responder_.FinishWithError(status, this);
159  }
160  else
161  {
162  std::pair<Response, grpc::Status> result = handler_(context);
163  responder_.Finish(result.first, result.second, this);
164  }
165  }
166  }
167  catch (std::exception const& ex)
168  {
169  grpc::Status status{grpc::StatusCode::INTERNAL, ex.what()};
170  responder_.FinishWithError(status, this);
171  }
172 }
173 
174 template <class Request, class Response>
175 bool
177 {
178  return finished_;
179 }
180 
181 template <class Request, class Response>
184 {
185  return loadType_;
186 }
187 
188 template <class Request, class Response>
189 Role
191 {
192  return Role::USER;
193 }
194 
195 template <class Request, class Response>
198 {
199  std::string peer = getEndpoint(ctx_.peer());
200  boost::optional<beast::IP::Endpoint> endpoint =
202  return app_.getResourceManager().newInboundEndpoint(endpoint.get());
203 }
204 
206  : app_(app), journal_(app_.journal("gRPC Server"))
207 {
208  // if present, get endpoint from config
209  if (app_.config().exists("port_grpc"))
210  {
211  Section section = app_.config().section("port_grpc");
212 
213  std::pair<std::string, bool> ipPair = section.find("ip");
214  if (!ipPair.second)
215  return;
216 
217  std::pair<std::string, bool> portPair = section.find("port");
218  if (!portPair.second)
219  return;
220  try
221  {
222  beast::IP::Endpoint endpoint(
223  boost::asio::ip::make_address(ipPair.first),
224  std::stoi(portPair.first));
225 
226  serverAddress_ = endpoint.to_string();
227  }
228  catch (std::exception const&)
229  {
230  }
231  }
232 }
233 
234 void
236 {
237  JLOG(journal_.debug()) << "Shutting down";
238 
239  // The below call cancels all "listeners" (CallData objects that are waiting
240  // for a request, as opposed to processing a request), and blocks until all
241  // requests being processed are completed. CallData objects in the midst of
242  // processing requests need to actually send data back to the client, via
243  // responder_.Finish(...) or responder_.FinishWithError(...), for this call
244  // to unblock. Each cancelled listener is returned via cq_.Next(...) with ok
245  // set to false
246  server_->Shutdown();
247  JLOG(journal_.debug()) << "Server has been shutdown";
248 
249  // Always shutdown the completion queue after the server. This call allows
250  // cq_.Next() to return false, once all events posted to the completion
251  // queue have been processed. See handleRpcs() for more details.
252  cq_->Shutdown();
253  JLOG(journal_.debug()) << "Completion Queue has been shutdown";
254 }
255 
256 void
258 {
259  // This collection should really be an unordered_set. However, to delete
260  // from the unordered_set, we need a shared_ptr, but cq_.Next() (see below
261  // while loop) sets the tag to a raw pointer.
263 
264  auto erase = [&requests](Processor* ptr) {
265  auto it = std::find_if(
266  requests.begin(),
267  requests.end(),
268  [ptr](std::shared_ptr<Processor>& sPtr) {
269  return sPtr.get() == ptr;
270  });
271  BOOST_ASSERT(it != requests.end());
272  it->swap(requests.back());
273  requests.pop_back();
274  };
275 
276  void* tag; // uniquely identifies a request.
277  bool ok;
278  // Block waiting to read the next event from the completion queue. The
279  // event is uniquely identified by its tag, which in this case is the
280  // memory address of a CallData instance.
281  // The return value of Next should always be checked. This return value
282  // tells us whether there is any kind of event or cq_ is shutting down.
283  // When cq_.Next(...) returns false, all work has been completed and the
284  // loop can exit. When the server is shutdown, each CallData object that is
285  // listening for a request is forceably cancelled, and is returned by
286  // cq_->Next() with ok set to false. Then, each CallData object processing
287  // a request must complete (by sending data to the client), each of which
288  // will be returned from cq_->Next() with ok set to true. After all
289  // cancelled listeners and all CallData objects processing requests are
290  // returned via cq_->Next(), cq_->Next() will return false, causing the
291  // loop to exit.
292  while (cq_->Next(&tag, &ok))
293  {
294  auto ptr = static_cast<Processor*>(tag);
295  JLOG(journal_.trace()) << "Processing CallData object."
296  << " ptr = " << ptr << " ok = " << ok;
297 
298  if (!ok)
299  {
300  JLOG(journal_.debug()) << "Request listener cancelled. "
301  << "Destroying object";
302  erase(ptr);
303  }
304  else
305  {
306  if (!ptr->isFinished())
307  {
308  JLOG(journal_.debug()) << "Received new request. Processing";
309  // ptr is now processing a request, so create a new CallData
310  // object to handle additional requests
311  auto cloned = ptr->clone();
312  requests.push_back(cloned);
313  // process the request
314  ptr->process();
315  }
316  else
317  {
318  JLOG(journal_.debug()) << "Sent response. Destroying object";
319  erase(ptr);
320  }
321  }
322  }
323  JLOG(journal_.debug()) << "Completion Queue drained";
324 }
325 
326 // create a CallData instance for each RPC
329 {
331 
332  auto addToRequests = [&requests](auto callData) {
333  requests.push_back(std::move(callData));
334  };
335 
336  {
337  using cd = CallData<
338  org::xrpl::rpc::v1::GetFeeRequest,
339  org::xrpl::rpc::v1::GetFeeResponse>;
340 
341  addToRequests(std::make_shared<cd>(
342  service_,
343  *cq_,
344  app_,
345  &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
346  RequestGetFee,
347  doFeeGrpc,
350  }
351  {
352  using cd = CallData<
353  org::xrpl::rpc::v1::GetAccountInfoRequest,
354  org::xrpl::rpc::v1::GetAccountInfoResponse>;
355 
356  addToRequests(std::make_shared<cd>(
357  service_,
358  *cq_,
359  app_,
360  &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
361  RequestGetAccountInfo,
365  }
366  {
367  using cd = CallData<
368  org::xrpl::rpc::v1::GetTransactionRequest,
369  org::xrpl::rpc::v1::GetTransactionResponse>;
370 
371  addToRequests(std::make_shared<cd>(
372  service_,
373  *cq_,
374  app_,
375  &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
376  RequestGetTransaction,
377  doTxGrpc,
380  }
381  {
382  using cd = CallData<
383  org::xrpl::rpc::v1::SubmitTransactionRequest,
384  org::xrpl::rpc::v1::SubmitTransactionResponse>;
385 
386  addToRequests(std::make_shared<cd>(
387  service_,
388  *cq_,
389  app_,
390  &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
391  RequestSubmitTransaction,
392  doSubmitGrpc,
395  }
396 
397  {
398  using cd = CallData<
399  org::xrpl::rpc::v1::GetAccountTransactionHistoryRequest,
400  org::xrpl::rpc::v1::GetAccountTransactionHistoryResponse>;
401 
402  addToRequests(std::make_shared<cd>(
403  service_,
404  *cq_,
405  app_,
406  &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
407  RequestGetAccountTransactionHistory,
411  }
412  return requests;
413 };
414 
415 bool
417 {
418  // if config does not specify a grpc server address, don't start
419  if (serverAddress_.empty())
420  return false;
421 
422  JLOG(journal_.info()) << "Starting gRPC server at " << serverAddress_;
423 
424  grpc::ServerBuilder builder;
425  // Listen on the given address without any authentication mechanism.
426  builder.AddListeningPort(serverAddress_, grpc::InsecureServerCredentials());
427  // Register "service_" as the instance through which we'll communicate with
428  // clients. In this case it corresponds to an *asynchronous* service.
429  builder.RegisterService(&service_);
430  // Get hold of the completion queue used for the asynchronous communication
431  // with the gRPC runtime.
432  cq_ = builder.AddCompletionQueue();
433  // Finally assemble the server.
434  server_ = builder.BuildAndStart();
435 
436  return true;
437 }
438 
439 void
441 {
442  // Start the server and setup listeners
443  if ((running_ = impl_.start()))
444  {
445  thread_ = std::thread([this]() {
446  // Start the event loop and begin handling requests
447  this->impl_.handleRpcs();
448  });
449  }
450 }
451 
453 {
454  if (running_)
455  {
456  impl_.shutdown();
457  thread_.join();
458  }
459 }
460 
461 } // namespace ripple
ripple::Resource::Manager::newInboundEndpoint
virtual Consumer newInboundEndpoint(beast::IP::Endpoint const &address)=0
Create a new endpoint keyed by inbound IP address or the forwarded IP if proxied.
ripple::Section
Holds a collection of configuration values.
Definition: BasicConfig.h:43
ripple::Application
Definition: Application.h:97
ripple::Processor
Definition: GRPCServer.h:41
std::string
STL class.
std::shared_ptr
STL class.
ripple::JobQueue::postCoro
std::shared_ptr< Coro > postCoro(JobType t, std::string const &name, F &&f)
Creates a coroutine and adds a job to the queue which will run it.
Definition: JobQueue.h:427
ripple::GRPCServer::impl_
GRPCServerImpl impl_
Definition: GRPCServer.h:251
std::exception
STL class.
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:309
ripple::GRPCServerImpl::CallData::clone
std::shared_ptr< Processor > clone() override
Definition: GRPCServer.cpp:68
ripple::RPC::get_error_info
ErrorInfo const & get_error_info(error_code_i code)
Returns an ErrorInfo that reflects the error code.
Definition: ErrorCodes.cpp:194
ripple::Resource::feeMediumBurdenRPC
const Charge feeMediumBurdenRPC
beast::IP::Endpoint::to_string
std::string to_string() const
Returns a string representing the endpoint.
Definition: IPEndpoint.cpp:54
std::pair
ripple::GRPCServerImpl::start
bool start()
Definition: GRPCServer.cpp:416
std::vector
STL class.
std::find_if
T find_if(T... args)
ripple::GRPCServerImpl::shutdown
void shutdown()
Definition: GRPCServer.cpp:235
ripple::GRPCServerImpl::CallData::request_
Request request_
Definition: GRPCServer.h:163
ripple::GRPCServerImpl::service_
org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService service_
Definition: GRPCServer.h:80
ripple::doAccountTxGrpc
std::pair< org::xrpl::rpc::v1::GetAccountTransactionHistoryResponse, grpc::Status > doAccountTxGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetAccountTransactionHistoryRequest > &context)
Definition: AccountTx.cpp:571
ripple::GRPCServerImpl::CallData::cq_
grpc::ServerCompletionQueue & cq_
Definition: GRPCServer.h:146
ripple::GRPCServer::running_
bool running_
Definition: GRPCServer.h:253
ripple::GRPCServerImpl::server_
std::unique_ptr< grpc::Server > server_
Definition: GRPCServer.h:82
ripple::Resource::feeReferenceRPC
const Charge feeReferenceRPC
std::vector::back
T back(T... args)
std::function
ripple::GRPCServerImpl::handleRpcs
void handleRpcs()
Definition: GRPCServer.cpp:257
Json::StaticString::c_str
constexpr const char * c_str() const
Definition: json_value.h:73
ripple::GRPCServerImpl::CallData::isFinished
virtual bool isFinished() override
Definition: GRPCServer.cpp:176
ripple::Application::getOPs
virtual NetworkOPs & getOPs()=0
ripple::error_code_i
error_code_i
Definition: ErrorCodes.h:40
std::vector::push_back
T push_back(T... args)
ripple::RPC::ErrorInfo::message
Json::StaticString message
Definition: ErrorCodes.h:175
ripple::erase
void erase(STObject &st, TypedField< U > const &f)
Remove a field in an STObject.
Definition: STExchange.h:171
ripple::GRPCServer::thread_
std::thread thread_
Definition: GRPCServer.h:252
ripple::GRPCServerImpl::CallData::service_
org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService & service_
Definition: GRPCServer.h:143
std::stoi
T stoi(T... args)
ripple::rpcSUCCESS
@ rpcSUCCESS
Definition: ErrorCodes.h:44
ripple::GRPCServerImpl::cq_
std::unique_ptr< grpc::ServerCompletionQueue > cq_
Definition: GRPCServer.h:75
ripple::GRPCServerImpl::CallData::getUsage
Resource::Consumer getUsage()
Definition: GRPCServer.cpp:197
std::thread
STL class.
ripple::InfoSub::pointer
std::shared_ptr< InfoSub > pointer
Definition: InfoSub.h:49
ripple::Application::getLedgerMaster
virtual LedgerMaster & getLedgerMaster()=0
ripple::Role::USER
@ USER
ripple::RPC::NEEDS_CURRENT_LEDGER
@ NEEDS_CURRENT_LEDGER
Definition: Handler.h:42
ripple::Application::config
virtual Config & config()=0
ripple::RPC::GRPCContext
Definition: Context.h:70
std::string::find_last_of
T find_last_of(T... args)
ripple::Application::getJobQueue
virtual JobQueue & getJobQueue()=0
ripple::doTxGrpc
std::pair< org::xrpl::rpc::v1::GetTransactionResponse, grpc::Status > doTxGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetTransactionRequest > &context)
Definition: Tx.cpp:389
ripple::RPC::NO_CONDITION
@ NO_CONDITION
Definition: Handler.h:40
beast::Journal::info
Stream info() const
Definition: Journal.h:321
ripple::GRPCServerImpl::GRPCServerImpl
GRPCServerImpl(Application &app)
Definition: GRPCServer.cpp:205
ripple::RPC::Condition
Condition
Definition: Handler.h:39
ripple::RPC::ErrorInfo
Maps an rpc error code to its token and default message.
Definition: ErrorCodes.h:157
ripple::GRPCServerImpl::CallData::getLoadType
Resource::Charge getLoadType()
Definition: GRPCServer.cpp:183
std::vector::pop_back
T pop_back(T... args)
ripple::Section::find
std::pair< std::string, bool > find(std::string const &name) const
Retrieve a key/value pair.
Definition: BasicConfig.cpp:113
ripple::GRPCServerImpl::CallData::bindListener_
BindListener< Request, Response > bindListener_
Definition: GRPCServer.h:172
ripple::Application::getResourceManager
virtual Resource::Manager & getResourceManager()=0
std::string::substr
T substr(T... args)
ripple::GRPCServer::~GRPCServer
~GRPCServer()
Definition: GRPCServer.cpp:452
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::GRPCServerImpl::setupListeners
std::vector< std::shared_ptr< Processor > > setupListeners()
Definition: GRPCServer.cpp:328
ripple::Application::journal
virtual beast::Journal journal(std::string const &name)=0
ripple::GRPCServerImpl::CallData::ctx_
grpc::ServerContext ctx_
Definition: GRPCServer.h:151
ripple::RPC::conditionMet
error_code_i conditionMet(Condition condition_required, T &context)
Definition: Handler.h:78
std::vector::begin
T begin(T... args)
std
STL namespace.
ripple::Resource::Consumer
An endpoint that consumes resources.
Definition: Consumer.h:33
ripple::Resource::Charge
A consumption charge.
Definition: Charge.h:30
ripple::GRPCServerImpl::CallData
Definition: GRPCServer.h:136
std::string::empty
T empty(T... args)
ripple::GRPCServer::run
void run()
Definition: GRPCServer.cpp:440
std::string::find_first_of
T find_first_of(T... args)
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
std::size_t
ripple::doFeeGrpc
std::pair< org::xrpl::rpc::v1::GetFeeResponse, grpc::Status > doFeeGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetFeeRequest > &context)
Definition: Fee1.cpp:42
beast::IP::Endpoint
A version-independent IP address and port combination.
Definition: IPEndpoint.h:39
ripple::GRPCServerImpl::app_
Application & app_
Definition: GRPCServer.h:84
std::vector::end
T end(T... args)
beast::IP::Endpoint::from_string_checked
static boost::optional< Endpoint > from_string_checked(std::string const &s)
Create an Endpoint from a string.
Definition: IPEndpoint.cpp:35
ripple::GRPCServerImpl::CallData::getRole
Role getRole()
Definition: GRPCServer.cpp:190
ripple::GRPCServerImpl::journal_
beast::Journal journal_
Definition: GRPCServer.h:88
ripple::Role
Role
Indicates the level of administrative permission to grant.
Definition: Role.h:40
ripple::GRPCServerImpl::CallData::CallData
CallData(org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService &service, grpc::ServerCompletionQueue &cq, Application &app, BindListener< Request, Response > bindListener, Handler< Request, Response > handler, RPC::Condition requiredCondition, Resource::Charge loadType)
Definition: GRPCServer.cpp:43
ripple::GRPCServerImpl::CallData::process
virtual void process() override
Definition: GRPCServer.cpp:82
ripple::GRPCServerImpl::serverAddress_
std::string serverAddress_
Definition: GRPCServer.h:86
std::thread::join
T join(T... args)
std::exception::what
T what(T... args)
ripple::doAccountInfoGrpc
std::pair< org::xrpl::rpc::v1::GetAccountInfoResponse, grpc::Status > doAccountInfoGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::GetAccountInfoRequest > &context)
Definition: AccountInfo.cpp:189
ripple::BasicConfig::exists
bool exists(std::string const &name) const
Returns true if a section with the given name exists.
Definition: BasicConfig.cpp:132
ripple::GRPCServerImpl::CallData::responder_
grpc::ServerAsyncResponseWriter< Response > responder_
Definition: GRPCServer.h:169
ripple::BasicConfig::section
Section & section(std::string const &name)
Returns the section with the given name.
Definition: BasicConfig.cpp:138
ripple::doSubmitGrpc
std::pair< org::xrpl::rpc::v1::SubmitTransactionResponse, grpc::Status > doSubmitGrpc(RPC::GRPCContext< org::xrpl::rpc::v1::SubmitTransactionRequest > &context)
Definition: Submit.cpp:198