rippled
Loading...
Searching...
No Matches
RPCSub.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2012, 2013 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <xrpld/net/RPCCall.h>
21#include <xrpld/net/RPCSub.h>
22
23#include <xrpl/basics/Log.h>
24#include <xrpl/basics/StringUtilities.h>
25#include <xrpl/basics/contract.h>
26#include <xrpl/json/to_string.h>
27
28#include <deque>
29
30namespace ripple {
31
32// Subscription object for JSON-RPC
33class RPCSubImp : public RPCSub
34{
35public:
37 InfoSub::Source& source,
38 boost::asio::io_service& io_service,
39 JobQueue& jobQueue,
40 std::string const& strUrl,
41 std::string const& strUsername,
42 std::string const& strPassword,
43 Logs& logs)
44 : RPCSub(source)
45 , m_io_service(io_service)
46 , m_jobQueue(jobQueue)
47 , mUrl(strUrl)
48 , mSSL(false)
49 , mUsername(strUsername)
50 , mPassword(strPassword)
51 , mSending(false)
52 , j_(logs.journal("RPCSub"))
53 , logs_(logs)
54 {
55 parsedURL pUrl;
56
57 if (!parseUrl(pUrl, strUrl))
58 Throw<std::runtime_error>("Failed to parse url.");
59 else if (pUrl.scheme == "https")
60 mSSL = true;
61 else if (pUrl.scheme != "http")
62 Throw<std::runtime_error>("Only http and https is supported.");
63
64 mSeq = 1;
65
66 mIp = pUrl.domain;
67 mPort = (!pUrl.port) ? (mSSL ? 443 : 80) : *pUrl.port;
68 mPath = pUrl.path;
69
70 JLOG(j_.info()) << "RPCCall::fromNetwork sub: ip=" << mIp
71 << " port=" << mPort
72 << " ssl= " << (mSSL ? "yes" : "no") << " path='"
73 << mPath << "'";
74 }
75
76 ~RPCSubImp() = default;
77
78 void
79 send(Json::Value const& jvObj, bool broadcast) override
80 {
82
83 auto jm = broadcast ? j_.debug() : j_.info();
84 JLOG(jm) << "RPCCall::fromNetwork push: " << jvObj;
85
87
88 if (!mSending)
89 {
90 // Start a sending thread.
91 JLOG(j_.info()) << "RPCCall::fromNetwork start";
92
94 jtCLIENT_SUBSCRIBE, "RPCSub::sendThread", [this]() {
95 sendThread();
96 });
97 }
98 }
99
100 void
101 setUsername(std::string const& strUsername) override
102 {
104
105 mUsername = strUsername;
106 }
107
108 void
109 setPassword(std::string const& strPassword) override
110 {
112
113 mPassword = strPassword;
114 }
115
116private:
117 // XXX Could probably create a bunch of send jobs in a single get of the
118 // lock.
119 void
121 {
122 Json::Value jvEvent;
123 bool bSend;
124
125 do
126 {
127 {
128 // Obtain the lock to manipulate the queue and change sending.
130
131 if (mDeque.empty())
132 {
133 mSending = false;
134 bSend = false;
135 }
136 else
137 {
138 auto const [seq, env] = mDeque.front();
139
141
142 jvEvent = env;
143 jvEvent["seq"] = seq;
144
145 bSend = true;
146 }
147 }
148
149 // Send outside of the lock.
150 if (bSend)
151 {
152 // XXX Might not need this in a try.
153 try
154 {
155 JLOG(j_.info()) << "RPCCall::fromNetwork: " << mIp;
156
159 mIp,
160 mPort,
161 mUsername,
162 mPassword,
163 mPath,
164 "event",
165 jvEvent,
166 mSSL,
167 true,
168 logs_);
169 }
170 catch (std::exception const& e)
171 {
172 JLOG(j_.info())
173 << "RPCCall::fromNetwork exception: " << e.what();
174 }
175 }
176 } while (bSend);
177 }
178
179private:
180 boost::asio::io_service& m_io_service;
182
186 bool mSSL;
190
191 int mSeq; // Next id to allocate.
192
193 bool mSending; // Sending threead is active.
194
196
199};
200
201//------------------------------------------------------------------------------
202
204{
205}
206
209 InfoSub::Source& source,
210 boost::asio::io_service& io_service,
211 JobQueue& jobQueue,
212 std::string const& strUrl,
213 std::string const& strUsername,
214 std::string const& strPassword,
215 Logs& logs)
216{
217 return std::make_shared<RPCSubImp>(
218 std::ref(source),
219 std::ref(io_service),
220 std::ref(jobQueue),
221 strUrl,
222 strUsername,
223 strPassword,
224 logs);
225}
226
227} // namespace ripple
Represents a JSON value.
Definition: json_value.h:150
A generic endpoint for log messages.
Definition: Journal.h:60
Stream debug() const
Definition: Journal.h:328
Stream info() const
Definition: Journal.h:334
Abstracts the source of subscription data.
Definition: InfoSub.h:68
Manages a client's subscription to data feeds.
Definition: InfoSub.h:52
std::mutex mLock
Definition: InfoSub.h:239
A pool of threads to perform work.
Definition: JobQueue.h:56
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition: JobQueue.h:166
Manages partitions for logging.
Definition: Log.h:51
boost::asio::io_service & m_io_service
Definition: RPCSub.cpp:180
~RPCSubImp()=default
std::string mIp
Definition: RPCSub.cpp:184
std::deque< std::pair< int, Json::Value > > mDeque
Definition: RPCSub.cpp:195
void setPassword(std::string const &strPassword) override
Definition: RPCSub.cpp:109
std::string mUsername
Definition: RPCSub.cpp:187
void sendThread()
Definition: RPCSub.cpp:120
beast::Journal const j_
Definition: RPCSub.cpp:197
std::uint16_t mPort
Definition: RPCSub.cpp:185
JobQueue & m_jobQueue
Definition: RPCSub.cpp:181
RPCSubImp(InfoSub::Source &source, boost::asio::io_service &io_service, JobQueue &jobQueue, std::string const &strUrl, std::string const &strUsername, std::string const &strPassword, Logs &logs)
Definition: RPCSub.cpp:36
void setUsername(std::string const &strUsername) override
Definition: RPCSub.cpp:101
std::string mPath
Definition: RPCSub.cpp:189
void send(Json::Value const &jvObj, bool broadcast) override
Definition: RPCSub.cpp:79
std::string mPassword
Definition: RPCSub.cpp:188
std::string mUrl
Definition: RPCSub.cpp:183
Subscription object for JSON RPC.
Definition: RPCSub.h:32
RPCSub(InfoSub::Source &source)
Definition: RPCSub.cpp:203
An endpoint that consumes resources.
Definition: Consumer.h:35
T empty(T... args)
T front(T... args)
T make_pair(T... args)
void fromNetwork(boost::asio::io_service &io_service, std::string const &strIp, std::uint16_t const iPort, std::string const &strUsername, std::string const &strPassword, std::string const &strPath, std::string const &strMethod, Json::Value const &jvParams, bool const bSSL, bool const quiet, Logs &logs, std::function< void(Json::Value const &jvInput)> callbackFuncP, std::unordered_map< std::string, std::string > headers)
Definition: RPCCall.cpp:1650
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: algorithm.h:26
std::shared_ptr< RPCSub > make_RPCSub(InfoSub::Source &source, boost::asio::io_service &io_service, JobQueue &jobQueue, std::string const &strUrl, std::string const &strUsername, std::string const &strPassword, Logs &logs)
Definition: RPCSub.cpp:208
bool parseUrl(parsedURL &pUrl, std::string const &strUrl)
@ jtCLIENT_SUBSCRIBE
Definition: Job.h:46
T pop_front(T... args)
T push_back(T... args)
T ref(T... args)
std::optional< std::uint16_t > port
T what(T... args)