rippled
Loading...
Searching...
No Matches
WSClient.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2016 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <test/jtx.h>
21#include <test/jtx/WSClient.h>
22#include <xrpl/json/json_reader.h>
23#include <xrpl/json/to_string.h>
24#include <xrpl/protocol/jss.h>
25#include <xrpl/server/Port.h>
26#include <boost/beast/core/multi_buffer.hpp>
27#include <boost/beast/websocket.hpp>
28
29#include <condition_variable>
30#include <string>
31#include <unordered_map>
32
33#include <iostream>
34
35#include <xrpl/beast/unit_test.h>
36
37namespace ripple {
38namespace test {
39
40class WSClientImpl : public WSClient
41{
42 using error_code = boost::system::error_code;
43
44 struct msg
45 {
47
48 explicit msg(Json::Value&& jv_) : jv(jv_)
49 {
50 }
51 };
52
53 static boost::asio::ip::tcp::endpoint
54 getEndpoint(BasicConfig const& cfg, bool v2)
55 {
56 auto& log = std::cerr;
57 ParsedPort common;
58 parse_Port(common, cfg["server"], log);
59 auto const ps = v2 ? "ws2" : "ws";
60 for (auto const& name : cfg.section("server").values())
61 {
62 if (!cfg.exists(name))
63 continue;
64 ParsedPort pp;
65 parse_Port(pp, cfg[name], log);
66 if (pp.protocol.count(ps) == 0)
67 continue;
68 using namespace boost::asio::ip;
69 if (pp.ip && pp.ip->is_unspecified())
70 *pp.ip = pp.ip->is_v6() ? address{address_v6::loopback()}
71 : address{address_v4::loopback()};
72 return {*pp.ip, *pp.port};
73 }
74 Throw<std::runtime_error>("Missing WebSocket port");
75 return {}; // Silence compiler control paths return value warning
76 }
77
78 template <class ConstBuffers>
79 static std::string
80 buffer_string(ConstBuffers const& b)
81 {
82 using boost::asio::buffer;
83 using boost::asio::buffer_size;
85 s.resize(buffer_size(b));
86 buffer_copy(buffer(&s[0], s.size()), b);
87 return s;
88 }
89
90 boost::asio::io_service ios_;
92 boost::asio::io_service::strand strand_;
94 boost::asio::ip::tcp::socket stream_;
95 boost::beast::websocket::stream<boost::asio::ip::tcp::socket&> ws_;
96 boost::beast::multi_buffer rb_;
97
98 bool peerClosed_ = false;
99
100 // synchronize destructor
101 bool b0_ = false;
104
105 // synchronize message queue
109
110 unsigned rpc_version_;
111
112 void
114 {
115 ios_.post(strand_.wrap([this] {
116 if (!peerClosed_)
117 {
118 ws_.async_close({}, strand_.wrap([&](error_code ec) {
119 stream_.cancel(ec);
120 }));
121 }
122 }));
123 work_ = std::nullopt;
124 thread_.join();
125 }
126
127public:
129 Config const& cfg,
130 bool v2,
131 unsigned rpc_version,
133 : work_(ios_)
134 , strand_(ios_)
135 , thread_([&] { ios_.run(); })
136 , stream_(ios_)
137 , ws_(stream_)
138 , rpc_version_(rpc_version)
139 {
140 try
141 {
142 auto const ep = getEndpoint(cfg, v2);
143 stream_.connect(ep);
144 ws_.set_option(boost::beast::websocket::stream_base::decorator(
145 [&](boost::beast::websocket::request_type& req) {
146 for (auto const& h : headers)
147 req.set(h.first, h.second);
148 }));
149 ws_.handshake(
150 ep.address().to_string() + ":" + std::to_string(ep.port()),
151 "/");
152 ws_.async_read(
153 rb_,
154 strand_.wrap(std::bind(
155 &WSClientImpl::on_read_msg, this, std::placeholders::_1)));
156 }
157 catch (std::exception&)
158 {
159 cleanup();
160 Rethrow();
161 }
162 }
163
164 ~WSClientImpl() override
165 {
166 cleanup();
167 }
168
170 invoke(std::string const& cmd, Json::Value const& params) override
171 {
172 using boost::asio::buffer;
173 using namespace std::chrono_literals;
174
175 {
176 Json::Value jp;
177 if (params)
178 jp = params;
179 if (rpc_version_ == 2)
180 {
181 jp[jss::method] = cmd;
182 jp[jss::jsonrpc] = "2.0";
183 jp[jss::ripplerpc] = "2.0";
184 jp[jss::id] = 5;
185 }
186 else
187 jp[jss::command] = cmd;
188 auto const s = to_string(jp);
189 ws_.write_some(true, buffer(s));
190 }
191
192 auto jv = findMsg(5s, [&](Json::Value const& jval) {
193 return jval[jss::type] == jss::response;
194 });
195 if (jv)
196 {
197 // Normalize JSON output
198 jv->removeMember(jss::type);
199 if ((*jv).isMember(jss::status) && (*jv)[jss::status] == jss::error)
200 {
201 Json::Value ret;
202 ret[jss::result] = *jv;
203 if ((*jv).isMember(jss::error))
204 ret[jss::error] = (*jv)[jss::error];
205 ret[jss::status] = jss::error;
206 return ret;
207 }
208 if ((*jv).isMember(jss::status) && (*jv).isMember(jss::result))
209 (*jv)[jss::result][jss::status] = (*jv)[jss::status];
210 return *jv;
211 }
212 return {};
213 }
214
216 getMsg(std::chrono::milliseconds const& timeout) override
217 {
219 {
221 if (!cv_.wait_for(lock, timeout, [&] { return !msgs_.empty(); }))
222 return std::nullopt;
223 m = std::move(msgs_.back());
224 msgs_.pop_back();
225 }
226 return std::move(m->jv);
227 }
228
231 std::chrono::milliseconds const& timeout,
232 std::function<bool(Json::Value const&)> pred) override
233 {
235 {
237 if (!cv_.wait_for(lock, timeout, [&] {
238 for (auto it = msgs_.begin(); it != msgs_.end(); ++it)
239 {
240 if (pred((*it)->jv))
241 {
242 m = std::move(*it);
243 msgs_.erase(it);
244 return true;
245 }
246 }
247 return false;
248 }))
249 {
250 return std::nullopt;
251 }
252 }
253 return std::move(m->jv);
254 }
255
256 unsigned
257 version() const override
258 {
259 return rpc_version_;
260 }
261
262private:
263 void
265 {
266 if (ec)
267 {
268 if (ec == boost::beast::websocket::error::closed)
269 peerClosed_ = true;
270 return;
271 }
272
273 Json::Value jv;
274 Json::Reader jr;
275 jr.parse(buffer_string(rb_.data()), jv);
276 rb_.consume(rb_.size());
277 auto m = std::make_shared<msg>(std::move(jv));
278 {
279 std::lock_guard lock(m_);
280 msgs_.push_front(m);
281 cv_.notify_all();
282 }
283 ws_.async_read(
284 rb_,
285 strand_.wrap(std::bind(
286 &WSClientImpl::on_read_msg, this, std::placeholders::_1)));
287 }
288
289 // Called when the read op terminates
290 void
292 {
293 std::lock_guard lock(m0_);
294 b0_ = true;
295 cv0_.notify_all();
296 }
297};
298
301 Config const& cfg,
302 bool v2,
303 unsigned rpc_version,
305{
306 return std::make_unique<WSClientImpl>(cfg, v2, rpc_version, headers);
307}
308
309} // namespace test
310} // namespace ripple
T back(T... args)
T bind(T... args)
Unserialize a JSON document into a Value.
Definition: json_reader.h:37
bool parse(std::string const &document, Value &root)
Read a Value from a JSON document.
Definition: json_reader.cpp:73
Represents a JSON value.
Definition: json_value.h:147
Holds unparsed configuration information.
Definition: BasicConfig.h:216
bool exists(std::string const &name) const
Returns true if a section with the given name exists.
Section & section(std::string const &name)
Returns the section with the given name.
std::vector< std::string > const & values() const
Returns all the values in the section.
Definition: BasicConfig.h:77
unsigned version() const override
Get RPC 1.0 or RPC 2.0.
Definition: WSClient.cpp:257
WSClientImpl(Config const &cfg, bool v2, unsigned rpc_version, std::unordered_map< std::string, std::string > const &headers={})
Definition: WSClient.cpp:128
boost::system::error_code error_code
Definition: WSClient.cpp:42
std::condition_variable cv0_
Definition: WSClient.cpp:103
static std::string buffer_string(ConstBuffers const &b)
Definition: WSClient.cpp:80
boost::beast::multi_buffer rb_
Definition: WSClient.cpp:96
std::optional< Json::Value > getMsg(std::chrono::milliseconds const &timeout) override
Retrieve a message.
Definition: WSClient.cpp:216
Json::Value invoke(std::string const &cmd, Json::Value const &params) override
Submit a command synchronously.
Definition: WSClient.cpp:170
static boost::asio::ip::tcp::endpoint getEndpoint(BasicConfig const &cfg, bool v2)
Definition: WSClient.cpp:54
void on_read_msg(error_code const &ec)
Definition: WSClient.cpp:264
boost::asio::io_service::strand strand_
Definition: WSClient.cpp:92
boost::beast::websocket::stream< boost::asio::ip::tcp::socket & > ws_
Definition: WSClient.cpp:95
boost::asio::io_service ios_
Definition: WSClient.cpp:90
std::optional< boost::asio::io_service::work > work_
Definition: WSClient.cpp:91
std::list< std::shared_ptr< msg > > msgs_
Definition: WSClient.cpp:108
boost::asio::ip::tcp::socket stream_
Definition: WSClient.cpp:94
std::condition_variable cv_
Definition: WSClient.cpp:107
std::optional< Json::Value > findMsg(std::chrono::milliseconds const &timeout, std::function< bool(Json::Value const &)> pred) override
Retrieve a message that meets the predicate criteria.
Definition: WSClient.cpp:230
T count(T... args)
T join(T... args)
std::unique_ptr< WSClient > makeWSClient(Config const &cfg, bool v2, unsigned rpc_version, std::unordered_map< std::string, std::string > const &headers)
Returns a client operating through WebSockets/S.
Definition: WSClient.cpp:300
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: algorithm.h:26
void parse_Port(ParsedPort &port, Section const &section, std::ostream &log)
Definition: Port.cpp:199
std::string to_string(base_uint< Bits, Tag > const &a)
Definition: base_uint.h:629
void Rethrow()
Rethrow the exception currently being handled.
Definition: contract.h:48
T pop_back(T... args)
T push_front(T... args)
T resize(T... args)
T size(T... args)
std::optional< std::uint16_t > port
Definition: Port.h:115
std::set< std::string, boost::beast::iless > protocol
Definition: Port.h:101
std::optional< boost::asio::ip::address > ip
Definition: Port.h:114
msg(Json::Value &&jv_)
Definition: WSClient.cpp:48
T to_string(T... args)