rippled
Loading...
Searching...
No Matches
GRPCServer.h
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2012, 2013 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#ifndef RIPPLE_CORE_GRPCSERVER_H_INCLUDED
21#define RIPPLE_CORE_GRPCSERVER_H_INCLUDED
22
23#include <xrpld/app/main/Application.h>
24#include <xrpld/core/JobQueue.h>
25#include <xrpld/net/InfoSub.h>
26#include <xrpld/rpc/Context.h>
27#include <xrpld/rpc/GRPCHandlers.h>
28#include <xrpld/rpc/Role.h>
29#include <xrpld/rpc/detail/Handler.h>
30#include <xrpld/rpc/detail/RPCHelpers.h>
31#include <xrpld/rpc/detail/Tuning.h>
32#include <xrpl/protocol/ErrorCodes.h>
33#include <xrpl/resource/Charge.h>
34
35#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
36#include <grpcpp/grpcpp.h>
37
38namespace ripple {
39
40// Interface that CallData implements
42{
43public:
44 virtual ~Processor() = default;
45
46 Processor() = default;
47
48 Processor(const Processor&) = delete;
49
51 operator=(const Processor&) = delete;
52
53 // process a request that has arrived. Can only be called once per instance
54 virtual void
55 process() = 0;
56
57 // create a new instance of this CallData object, with the same type
58 //(same template parameters) as original. This is called when a CallData
59 // object starts processing a request. Creating a new instance allows the
60 // server to handle additional requests while the first is being processed
62 clone() = 0;
63
64 // true if this object has finished processing the request. Object will be
65 // deleted once this function returns true
66 virtual bool
68};
69
70class GRPCServerImpl final
71{
72private:
73 // CompletionQueue returns events that have occurred, or events that have
74 // been cancelled
76
78
79 // The gRPC service defined by the .proto files
80 org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService service_;
81
83
85
87
89
91
92 // typedef for function to bind a listener
93 // This is always of the form:
94 // org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::Request[RPC NAME]
95 template <class Request, class Response>
97 org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService&,
98 grpc::ServerContext*,
99 Request*,
100 grpc::ServerAsyncResponseWriter<Response>*,
101 grpc::CompletionQueue*,
102 grpc::ServerCompletionQueue*,
103 void*)>;
104
105 // typedef for actual handler (that populates a response)
106 // handlers are defined in rpc/GRPCHandlers.h
107 template <class Request, class Response>
110 // This implementation is currently limited to v1 of the API
111 static unsigned constexpr apiVersion = 1;
112
113 template <class Request, class Response>
114 using Forward = std::function<grpc::Status(
115 org::xrpl::rpc::v1::XRPLedgerAPIService::Stub*,
116 grpc::ClientContext*,
117 Request,
118 Response*)>;
119
120public:
121 explicit GRPCServerImpl(Application& app);
122
124
126 operator=(const GRPCServerImpl&) = delete;
127
128 void
129 shutdown();
130
131 // setup the server and listeners
132 // returns true if server started successfully
133 bool
134 start();
135
136 // the main event loop
137 void
138 handleRpcs();
139
140 // Create a CallData object for each RPC. Return created objects in vector
143
144private:
145 // Class encompasing the state and logic needed to serve a request.
146 template <class Request, class Response>
148 : public Processor,
149 public std::enable_shared_from_this<CallData<Request, Response>>
150 {
151 private:
152 // The means of communication with the gRPC runtime for an asynchronous
153 // server.
154 org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService& service_;
155
156 // The producer-consumer queue for asynchronous server notifications.
157 grpc::ServerCompletionQueue& cq_;
158
159 // Context for the rpc, allowing to tweak aspects of it such as the use
160 // of compression, authentication, as well as to send metadata back to
161 // the client.
162 grpc::ServerContext ctx_;
163
164 // true if finished processing request
165 // Note, this variable does not need to be atomic, since it is
166 // currently only accessed from one thread. However, isFinished(),
167 // which returns the value of this variable, is public facing. In the
168 // interest of avoiding future concurrency bugs, we make it atomic.
170
172
173 // What we get from the client.
174 Request request_;
175
176 // The means to get back to the client.
177 grpc::ServerAsyncResponseWriter<Response> responder_;
178
179 // Function that creates a listener for specific request type
181
182 // Function that processes a request
184
185 // Function to call to forward to another server
187
188 // Condition required for this RPC
190
191 // Load type for this RPC
193
195
196 public:
197 virtual ~CallData() = default;
198
199 // Take in the "service" instance (in this case representing an
200 // asynchronous server) and the completion queue "cq" used for
201 // asynchronous communication with the gRPC runtime.
202 explicit CallData(
203 org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService& service,
204 grpc::ServerCompletionQueue& cq,
205 Application& app,
209 RPC::Condition requiredCondition,
210 Resource::Charge loadType,
211 std::vector<boost::asio::ip::address> const& secureGatewayIPs);
212
213 CallData(const CallData&) = delete;
214
215 CallData&
216 operator=(const CallData&) = delete;
217
218 virtual void
219 process() override;
220
221 virtual bool
222 isFinished() override;
223
225 clone() override;
226
227 private:
228 // process the request. Called inside the coroutine passed to JobQueue
229 void
231
232 // return load type of this RPC
234 getLoadType();
235
236 // return the Role used for this RPC
237 Role
238 getRole(bool isUnlimited);
239
240 // register endpoint with ResourceManager and return usage
242 getUsage();
243
244 // Returns the ip of the client
245 // Empty optional if there was an error decoding the client ip
248
249 // Returns the endpoint of the client.
250 // Empty optional if there was an error decoding the client
251 // endpoint
254
255 // If the request was proxied through
256 // another rippled node, returns the ip of the originating client.
257 // Empty optional if request was not proxied or there was an error
258 // decoding the client ip
261
262 // If the request was proxied through
263 // another rippled node, returns the endpoint of the originating client.
264 // Empty optional if request was not proxied or there was an error
265 // decoding the client endpoint
268
269 // Returns the user specified in the request. Empty optional if no user
270 // was specified
272 getUser();
273
274 // Sets is_unlimited in response to value of clientIsUnlimited
275 // Does nothing if is_unlimited is not a field of the response
276 void
277 setIsUnlimited(Response& response, bool isUnlimited);
278
279 // True if the client is exempt from resource controls
280 bool
282
283 // True if the request was proxied through another rippled node prior
284 // to arriving here
285 bool
287
288 // forward request to a p2p node
289 void
291
292 }; // CallData
293
294}; // GRPCServerImpl
295
297{
298public:
299 explicit GRPCServer(Application& app) : impl_(app)
300 {
301 }
302
303 GRPCServer(const GRPCServer&) = delete;
304
306 operator=(const GRPCServer&) = delete;
307
308 void
309 start();
310
311 void
312 stop();
313
314 ~GRPCServer();
315
316private:
319 bool running_ = false;
320};
321} // namespace ripple
322#endif
A generic endpoint for log messages.
Definition: Journal.h:59
Forward< Request, Response > forward_
Definition: GRPCServer.h:186
Resource::Consumer getUsage()
Definition: GRPCServer.cpp:310
std::optional< std::string > getUser()
Definition: GRPCServer.cpp:243
grpc::ServerCompletionQueue & cq_
Definition: GRPCServer.h:157
grpc::ServerContext ctx_
Definition: GRPCServer.h:162
Handler< Request, Response > handler_
Definition: GRPCServer.h:183
Resource::Charge getLoadType()
Definition: GRPCServer.cpp:226
void setIsUnlimited(Response &response, bool isUnlimited)
Definition: GRPCServer.cpp:294
std::optional< boost::asio::ip::address > getClientIpAddress()
Definition: GRPCServer.cpp:259
std::optional< boost::asio::ip::tcp::endpoint > getProxiedClientEndpoint()
CallData & operator=(const CallData &)=delete
BindListener< Request, Response > bindListener_
Definition: GRPCServer.h:180
grpc::ServerAsyncResponseWriter< Response > responder_
Definition: GRPCServer.h:177
Role getRole(bool isUnlimited)
Definition: GRPCServer.cpp:233
std::vector< boost::asio::ip::address > const & secureGatewayIPs_
Definition: GRPCServer.h:194
std::shared_ptr< Processor > clone() override
Definition: GRPCServer.cpp:89
CallData(const CallData &)=delete
std::optional< boost::asio::ip::tcp::endpoint > getClientEndpoint()
Definition: GRPCServer.cpp:269
std::optional< boost::asio::ip::address > getProxiedClientIpAddress()
org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService & service_
Definition: GRPCServer.h:154
virtual void process() override
Definition: GRPCServer.cpp:105
virtual bool isFinished() override
Definition: GRPCServer.cpp:219
void forwardToP2p(RPC::GRPCContext< Request > &context)
std::string serverAddress_
Definition: GRPCServer.h:86
Application & app_
Definition: GRPCServer.h:84
std::vector< std::shared_ptr< Processor > > setupListeners()
Definition: GRPCServer.cpp:478
GRPCServerImpl & operator=(const GRPCServerImpl &)=delete
std::unique_ptr< grpc::Server > server_
Definition: GRPCServer.h:82
std::unique_ptr< grpc::ServerCompletionQueue > cq_
Definition: GRPCServer.h:75
org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService service_
Definition: GRPCServer.h:80
std::vector< std::shared_ptr< Processor > > requests_
Definition: GRPCServer.h:77
static unsigned constexpr apiVersion
Definition: GRPCServer.h:111
std::vector< boost::asio::ip::address > secureGatewayIPs_
Definition: GRPCServer.h:88
GRPCServerImpl(const GRPCServerImpl &)=delete
beast::Journal journal_
Definition: GRPCServer.h:90
GRPCServer(Application &app)
Definition: GRPCServer.h:299
GRPCServer & operator=(const GRPCServer &)=delete
std::thread thread_
Definition: GRPCServer.h:318
GRPCServer(const GRPCServer &)=delete
GRPCServerImpl impl_
Definition: GRPCServer.h:317
virtual void process()=0
Processor(const Processor &)=delete
virtual bool isFinished()=0
Processor & operator=(const Processor &)=delete
Processor()=default
virtual std::shared_ptr< Processor > clone()=0
virtual ~Processor()=default
A consumption charge.
Definition: Charge.h:31
An endpoint that consumes resources.
Definition: Consumer.h:35
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: algorithm.h:26
bool isUnlimited(Role const &role)
ADMIN and IDENTIFIED roles shall have unlimited resources.
Definition: Role.cpp:124
Role
Indicates the level of administrative permission to grant.
Definition: Role.h:43