rippled
Loading...
Searching...
No Matches
RPCSub.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2012, 2013 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <xrpld/net/RPCCall.h>
21#include <xrpld/net/RPCSub.h>
22
23#include <xrpl/basics/Log.h>
24#include <xrpl/basics/StringUtilities.h>
25#include <xrpl/basics/contract.h>
26#include <xrpl/json/to_string.h>
27
28#include <deque>
29
30namespace ripple {
31
32// Subscription object for JSON-RPC
33class RPCSubImp : public RPCSub
34{
35public:
37 InfoSub::Source& source,
38 boost::asio::io_service& io_service,
39 JobQueue& jobQueue,
40 std::string const& strUrl,
41 std::string const& strUsername,
42 std::string const& strPassword,
43 Logs& logs)
44 : RPCSub(source)
45 , m_io_service(io_service)
46 , m_jobQueue(jobQueue)
47 , mUrl(strUrl)
48 , mSSL(false)
49 , mUsername(strUsername)
50 , mPassword(strPassword)
51 , mSending(false)
52 , j_(logs.journal("RPCSub"))
53 , logs_(logs)
54 {
55 parsedURL pUrl;
56
57 if (!parseUrl(pUrl, strUrl))
58 Throw<std::runtime_error>("Failed to parse url.");
59 else if (pUrl.scheme == "https")
60 mSSL = true;
61 else if (pUrl.scheme != "http")
62 Throw<std::runtime_error>("Only http and https is supported.");
63
64 mSeq = 1;
65
66 mIp = pUrl.domain;
67 mPort = (!pUrl.port) ? (mSSL ? 443 : 80) : *pUrl.port;
68 mPath = pUrl.path;
69
70 JLOG(j_.info()) << "RPCCall::fromNetwork sub: ip=" << mIp
71 << " port=" << mPort
72 << " ssl= " << (mSSL ? "yes" : "no") << " path='"
73 << mPath << "'";
74 }
75
76 ~RPCSubImp() = default;
77
78 void
79 send(Json::Value const& jvObj, bool broadcast) override
80 {
82
83 if (mDeque.size() >= eventQueueMax)
84 {
85 // Drop the previous event.
86 JLOG(j_.warn()) << "RPCCall::fromNetwork drop";
88 }
89
90 auto jm = broadcast ? j_.debug() : j_.info();
91 JLOG(jm) << "RPCCall::fromNetwork push: " << jvObj;
92
94
95 if (!mSending)
96 {
97 // Start a sending thread.
98 JLOG(j_.info()) << "RPCCall::fromNetwork start";
99
101 jtCLIENT_SUBSCRIBE, "RPCSub::sendThread", [this]() {
102 sendThread();
103 });
104 }
105 }
106
107 void
108 setUsername(std::string const& strUsername) override
109 {
111
112 mUsername = strUsername;
113 }
114
115 void
116 setPassword(std::string const& strPassword) override
117 {
119
120 mPassword = strPassword;
121 }
122
123private:
124 // XXX Could probably create a bunch of send jobs in a single get of the
125 // lock.
126 void
128 {
129 Json::Value jvEvent;
130 bool bSend;
131
132 do
133 {
134 {
135 // Obtain the lock to manipulate the queue and change sending.
137
138 if (mDeque.empty())
139 {
140 mSending = false;
141 bSend = false;
142 }
143 else
144 {
145 auto const [seq, env] = mDeque.front();
146
148
149 jvEvent = env;
150 jvEvent["seq"] = seq;
151
152 bSend = true;
153 }
154 }
155
156 // Send outside of the lock.
157 if (bSend)
158 {
159 // XXX Might not need this in a try.
160 try
161 {
162 JLOG(j_.info()) << "RPCCall::fromNetwork: " << mIp;
163
166 mIp,
167 mPort,
168 mUsername,
169 mPassword,
170 mPath,
171 "event",
172 jvEvent,
173 mSSL,
174 true,
175 logs_);
176 }
177 catch (const std::exception& e)
178 {
179 JLOG(j_.info())
180 << "RPCCall::fromNetwork exception: " << e.what();
181 }
182 }
183 } while (bSend);
184 }
185
186private:
187 enum { eventQueueMax = 32 };
188
189 boost::asio::io_service& m_io_service;
191
195 bool mSSL;
199
200 int mSeq; // Next id to allocate.
201
202 bool mSending; // Sending threead is active.
203
205
208};
209
210//------------------------------------------------------------------------------
211
213{
214}
215
218 InfoSub::Source& source,
219 boost::asio::io_service& io_service,
220 JobQueue& jobQueue,
221 std::string const& strUrl,
222 std::string const& strUsername,
223 std::string const& strPassword,
224 Logs& logs)
225{
226 return std::make_shared<RPCSubImp>(
227 std::ref(source),
228 std::ref(io_service),
229 std::ref(jobQueue),
230 strUrl,
231 strUsername,
232 strPassword,
233 logs);
234}
235
236} // namespace ripple
Represents a JSON value.
Definition: json_value.h:148
A generic endpoint for log messages.
Definition: Journal.h:60
Stream debug() const
Definition: Journal.h:328
Stream info() const
Definition: Journal.h:334
Stream warn() const
Definition: Journal.h:340
Abstracts the source of subscription data.
Definition: InfoSub.h:68
Manages a client's subscription to data feeds.
Definition: InfoSub.h:52
std::mutex mLock
Definition: InfoSub.h:239
A pool of threads to perform work.
Definition: JobQueue.h:56
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition: JobQueue.h:166
Manages partitions for logging.
Definition: Log.h:51
boost::asio::io_service & m_io_service
Definition: RPCSub.cpp:189
~RPCSubImp()=default
std::string mIp
Definition: RPCSub.cpp:193
std::deque< std::pair< int, Json::Value > > mDeque
Definition: RPCSub.cpp:204
void setPassword(std::string const &strPassword) override
Definition: RPCSub.cpp:116
std::string mUsername
Definition: RPCSub.cpp:196
void sendThread()
Definition: RPCSub.cpp:127
beast::Journal const j_
Definition: RPCSub.cpp:206
std::uint16_t mPort
Definition: RPCSub.cpp:194
JobQueue & m_jobQueue
Definition: RPCSub.cpp:190
RPCSubImp(InfoSub::Source &source, boost::asio::io_service &io_service, JobQueue &jobQueue, std::string const &strUrl, std::string const &strUsername, std::string const &strPassword, Logs &logs)
Definition: RPCSub.cpp:36
void setUsername(std::string const &strUsername) override
Definition: RPCSub.cpp:108
std::string mPath
Definition: RPCSub.cpp:198
void send(Json::Value const &jvObj, bool broadcast) override
Definition: RPCSub.cpp:79
std::string mPassword
Definition: RPCSub.cpp:197
std::string mUrl
Definition: RPCSub.cpp:192
Subscription object for JSON RPC.
Definition: RPCSub.h:32
RPCSub(InfoSub::Source &source)
Definition: RPCSub.cpp:212
An endpoint that consumes resources.
Definition: Consumer.h:35
T empty(T... args)
T front(T... args)
T make_pair(T... args)
void fromNetwork(boost::asio::io_service &io_service, std::string const &strIp, const std::uint16_t iPort, std::string const &strUsername, std::string const &strPassword, std::string const &strPath, std::string const &strMethod, Json::Value const &jvParams, const bool bSSL, const bool quiet, Logs &logs, std::function< void(Json::Value const &jvInput)> callbackFuncP, std::unordered_map< std::string, std::string > headers)
Definition: RPCCall.cpp:1631
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: algorithm.h:26
std::shared_ptr< RPCSub > make_RPCSub(InfoSub::Source &source, boost::asio::io_service &io_service, JobQueue &jobQueue, std::string const &strUrl, std::string const &strUsername, std::string const &strPassword, Logs &logs)
Definition: RPCSub.cpp:217
bool parseUrl(parsedURL &pUrl, std::string const &strUrl)
@ jtCLIENT_SUBSCRIBE
Definition: Job.h:46
T pop_back(T... args)
T pop_front(T... args)
T push_back(T... args)
T ref(T... args)
T size(T... args)
std::optional< std::uint16_t > port
T what(T... args)