rippled
Loading...
Searching...
No Matches
WSClient.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2016 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <test/jtx.h>
21#include <test/jtx/WSClient.h>
22
23#include <xrpl/json/json_reader.h>
24#include <xrpl/json/to_string.h>
25#include <xrpl/protocol/jss.h>
26#include <xrpl/server/Port.h>
27
28#include <boost/beast/core/multi_buffer.hpp>
29#include <boost/beast/websocket.hpp>
30
31#include <iostream>
32#include <string>
33#include <unordered_map>
34
35namespace ripple {
36namespace test {
37
38class WSClientImpl : public WSClient
39{
40 using error_code = boost::system::error_code;
41
42 struct msg
43 {
45
46 explicit msg(Json::Value&& jv_) : jv(jv_)
47 {
48 }
49 };
50
51 static boost::asio::ip::tcp::endpoint
52 getEndpoint(BasicConfig const& cfg, bool v2)
53 {
54 auto& log = std::cerr;
55 ParsedPort common;
56 parse_Port(common, cfg["server"], log);
57 auto const ps = v2 ? "ws2" : "ws";
58 for (auto const& name : cfg.section("server").values())
59 {
60 if (!cfg.exists(name))
61 continue;
62 ParsedPort pp;
63 parse_Port(pp, cfg[name], log);
64 if (pp.protocol.count(ps) == 0)
65 continue;
66 using namespace boost::asio::ip;
67 if (pp.ip && pp.ip->is_unspecified())
68 *pp.ip = pp.ip->is_v6() ? address{address_v6::loopback()}
69 : address{address_v4::loopback()};
70
71 if (!pp.port)
72 Throw<std::runtime_error>("Use fixConfigPorts with auto ports");
73
74 return {*pp.ip, *pp.port};
75 }
76 Throw<std::runtime_error>("Missing WebSocket port");
77 return {}; // Silence compiler control paths return value warning
78 }
79
80 template <class ConstBuffers>
81 static std::string
82 buffer_string(ConstBuffers const& b)
83 {
84 using boost::asio::buffer;
85 using boost::asio::buffer_size;
87 s.resize(buffer_size(b));
88 buffer_copy(buffer(&s[0], s.size()), b);
89 return s;
90 }
91
92 boost::asio::io_service ios_;
94 boost::asio::io_service::strand strand_;
96 boost::asio::ip::tcp::socket stream_;
97 boost::beast::websocket::stream<boost::asio::ip::tcp::socket&> ws_;
98 boost::beast::multi_buffer rb_;
99
100 bool peerClosed_ = false;
101
102 // synchronize destructor
103 bool b0_ = false;
106
107 // synchronize message queue
111
112 unsigned rpc_version_;
113
114 void
116 {
117 ios_.post(strand_.wrap([this] {
118 if (!peerClosed_)
119 {
120 ws_.async_close({}, strand_.wrap([&](error_code ec) {
121 stream_.cancel(ec);
122 }));
123 }
124 }));
125 work_ = std::nullopt;
126 thread_.join();
127 }
128
129public:
131 Config const& cfg,
132 bool v2,
133 unsigned rpc_version,
135 : work_(ios_)
136 , strand_(ios_)
137 , thread_([&] { ios_.run(); })
138 , stream_(ios_)
139 , ws_(stream_)
140 , rpc_version_(rpc_version)
141 {
142 try
143 {
144 auto const ep = getEndpoint(cfg, v2);
145 stream_.connect(ep);
146 ws_.set_option(boost::beast::websocket::stream_base::decorator(
147 [&](boost::beast::websocket::request_type& req) {
148 for (auto const& h : headers)
149 req.set(h.first, h.second);
150 }));
151 ws_.handshake(
152 ep.address().to_string() + ":" + std::to_string(ep.port()),
153 "/");
154 ws_.async_read(
155 rb_,
156 strand_.wrap(std::bind(
157 &WSClientImpl::on_read_msg, this, std::placeholders::_1)));
158 }
159 catch (std::exception&)
160 {
161 cleanup();
162 Rethrow();
163 }
164 }
165
166 ~WSClientImpl() override
167 {
168 cleanup();
169 }
170
172 invoke(std::string const& cmd, Json::Value const& params) override
173 {
174 using boost::asio::buffer;
175 using namespace std::chrono_literals;
176
177 {
178 Json::Value jp;
179 if (params)
180 jp = params;
181 if (rpc_version_ == 2)
182 {
183 jp[jss::method] = cmd;
184 jp[jss::jsonrpc] = "2.0";
185 jp[jss::ripplerpc] = "2.0";
186 jp[jss::id] = 5;
187 }
188 else
189 jp[jss::command] = cmd;
190 auto const s = to_string(jp);
191 ws_.write_some(true, buffer(s));
192 }
193
194 auto jv = findMsg(5s, [&](Json::Value const& jval) {
195 return jval[jss::type] == jss::response;
196 });
197 if (jv)
198 {
199 // Normalize JSON output
200 jv->removeMember(jss::type);
201 if ((*jv).isMember(jss::status) && (*jv)[jss::status] == jss::error)
202 {
203 Json::Value ret;
204 ret[jss::result] = *jv;
205 if ((*jv).isMember(jss::error))
206 ret[jss::error] = (*jv)[jss::error];
207 ret[jss::status] = jss::error;
208 return ret;
209 }
210 if ((*jv).isMember(jss::status) && (*jv).isMember(jss::result))
211 (*jv)[jss::result][jss::status] = (*jv)[jss::status];
212 return *jv;
213 }
214 return {};
215 }
216
218 getMsg(std::chrono::milliseconds const& timeout) override
219 {
221 {
223 if (!cv_.wait_for(lock, timeout, [&] { return !msgs_.empty(); }))
224 return std::nullopt;
225 m = std::move(msgs_.back());
226 msgs_.pop_back();
227 }
228 return std::move(m->jv);
229 }
230
233 std::chrono::milliseconds const& timeout,
234 std::function<bool(Json::Value const&)> pred) override
235 {
237 {
239 if (!cv_.wait_for(lock, timeout, [&] {
240 for (auto it = msgs_.begin(); it != msgs_.end(); ++it)
241 {
242 if (pred((*it)->jv))
243 {
244 m = std::move(*it);
245 msgs_.erase(it);
246 return true;
247 }
248 }
249 return false;
250 }))
251 {
252 return std::nullopt;
253 }
254 }
255 return std::move(m->jv);
256 }
257
258 unsigned
259 version() const override
260 {
261 return rpc_version_;
262 }
263
264private:
265 void
267 {
268 if (ec)
269 {
270 if (ec == boost::beast::websocket::error::closed)
271 peerClosed_ = true;
272 return;
273 }
274
275 Json::Value jv;
276 Json::Reader jr;
277 jr.parse(buffer_string(rb_.data()), jv);
278 rb_.consume(rb_.size());
279 auto m = std::make_shared<msg>(std::move(jv));
280 {
281 std::lock_guard lock(m_);
282 msgs_.push_front(m);
283 cv_.notify_all();
284 }
285 ws_.async_read(
286 rb_,
287 strand_.wrap(std::bind(
288 &WSClientImpl::on_read_msg, this, std::placeholders::_1)));
289 }
290
291 // Called when the read op terminates
292 void
294 {
295 std::lock_guard lock(m0_);
296 b0_ = true;
297 cv0_.notify_all();
298 }
299};
300
303 Config const& cfg,
304 bool v2,
305 unsigned rpc_version,
307{
308 return std::make_unique<WSClientImpl>(cfg, v2, rpc_version, headers);
309}
310
311} // namespace test
312} // namespace ripple
T back(T... args)
T bind(T... args)
Unserialize a JSON document into a Value.
Definition: json_reader.h:39
bool parse(std::string const &document, Value &root)
Read a Value from a JSON document.
Definition: json_reader.cpp:78
Represents a JSON value.
Definition: json_value.h:148
Holds unparsed configuration information.
Definition: BasicConfig.h:218
bool exists(std::string const &name) const
Returns true if a section with the given name exists.
Section & section(std::string const &name)
Returns the section with the given name.
std::vector< std::string > const & values() const
Returns all the values in the section.
Definition: BasicConfig.h:79
unsigned version() const override
Get RPC 1.0 or RPC 2.0.
Definition: WSClient.cpp:259
WSClientImpl(Config const &cfg, bool v2, unsigned rpc_version, std::unordered_map< std::string, std::string > const &headers={})
Definition: WSClient.cpp:130
boost::system::error_code error_code
Definition: WSClient.cpp:40
std::condition_variable cv0_
Definition: WSClient.cpp:105
static std::string buffer_string(ConstBuffers const &b)
Definition: WSClient.cpp:82
boost::beast::multi_buffer rb_
Definition: WSClient.cpp:98
std::optional< Json::Value > getMsg(std::chrono::milliseconds const &timeout) override
Retrieve a message.
Definition: WSClient.cpp:218
Json::Value invoke(std::string const &cmd, Json::Value const &params) override
Submit a command synchronously.
Definition: WSClient.cpp:172
static boost::asio::ip::tcp::endpoint getEndpoint(BasicConfig const &cfg, bool v2)
Definition: WSClient.cpp:52
void on_read_msg(error_code const &ec)
Definition: WSClient.cpp:266
boost::asio::io_service::strand strand_
Definition: WSClient.cpp:94
boost::beast::websocket::stream< boost::asio::ip::tcp::socket & > ws_
Definition: WSClient.cpp:97
boost::asio::io_service ios_
Definition: WSClient.cpp:92
std::optional< boost::asio::io_service::work > work_
Definition: WSClient.cpp:93
std::list< std::shared_ptr< msg > > msgs_
Definition: WSClient.cpp:110
boost::asio::ip::tcp::socket stream_
Definition: WSClient.cpp:96
std::condition_variable cv_
Definition: WSClient.cpp:109
std::optional< Json::Value > findMsg(std::chrono::milliseconds const &timeout, std::function< bool(Json::Value const &)> pred) override
Retrieve a message that meets the predicate criteria.
Definition: WSClient.cpp:232
T count(T... args)
T join(T... args)
std::unique_ptr< WSClient > makeWSClient(Config const &cfg, bool v2, unsigned rpc_version, std::unordered_map< std::string, std::string > const &headers)
Returns a client operating through WebSockets/S.
Definition: WSClient.cpp:302
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: algorithm.h:26
void parse_Port(ParsedPort &port, Section const &section, std::ostream &log)
Definition: Port.cpp:213
std::string to_string(base_uint< Bits, Tag > const &a)
Definition: base_uint.h:630
void Rethrow()
Rethrow the exception currently being handled.
Definition: contract.h:48
T pop_back(T... args)
T push_front(T... args)
T resize(T... args)
T size(T... args)
std::optional< std::uint16_t > port
Definition: Port.h:116
std::set< std::string, boost::beast::iless > protocol
Definition: Port.h:102
std::optional< boost::asio::ip::address > ip
Definition: Port.h:115
msg(Json::Value &&jv_)
Definition: WSClient.cpp:46
T to_string(T... args)