rippled
Loading...
Searching...
No Matches
WSClient.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2016 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <test/jtx.h>
21#include <test/jtx/WSClient.h>
22
23#include <xrpl/json/json_reader.h>
24#include <xrpl/json/to_string.h>
25#include <xrpl/protocol/jss.h>
26#include <xrpl/server/Port.h>
27
28#include <boost/asio/executor_work_guard.hpp>
29#include <boost/asio/io_context.hpp>
30#include <boost/asio/strand.hpp>
31#include <boost/beast/core/multi_buffer.hpp>
32#include <boost/beast/websocket.hpp>
33
34#include <iostream>
35#include <string>
36#include <unordered_map>
37
38namespace ripple {
39namespace test {
40
41class WSClientImpl : public WSClient
42{
43 using error_code = boost::system::error_code;
44
45 struct msg
46 {
48
49 explicit msg(Json::Value&& jv_) : jv(jv_)
50 {
51 }
52 };
53
54 static boost::asio::ip::tcp::endpoint
55 getEndpoint(BasicConfig const& cfg, bool v2)
56 {
57 auto& log = std::cerr;
58 ParsedPort common;
59 parse_Port(common, cfg["server"], log);
60 auto const ps = v2 ? "ws2" : "ws";
61 for (auto const& name : cfg.section("server").values())
62 {
63 if (!cfg.exists(name))
64 continue;
65 ParsedPort pp;
66 parse_Port(pp, cfg[name], log);
67 if (pp.protocol.count(ps) == 0)
68 continue;
69 using namespace boost::asio::ip;
70 if (pp.ip && pp.ip->is_unspecified())
71 *pp.ip = pp.ip->is_v6() ? address{address_v6::loopback()}
72 : address{address_v4::loopback()};
73
74 if (!pp.port)
75 Throw<std::runtime_error>("Use fixConfigPorts with auto ports");
76
77 return {*pp.ip, *pp.port};
78 }
79 Throw<std::runtime_error>("Missing WebSocket port");
80 return {}; // Silence compiler control paths return value warning
81 }
82
83 template <class ConstBuffers>
84 static std::string
85 buffer_string(ConstBuffers const& b)
86 {
87 using boost::asio::buffer;
88 using boost::asio::buffer_size;
90 s.resize(buffer_size(b));
91 buffer_copy(buffer(&s[0], s.size()), b);
92 return s;
93 }
94
95 boost::asio::io_context ios_;
96 std::optional<boost::asio::executor_work_guard<
97 boost::asio::io_context::executor_type>>
99 boost::asio::strand<boost::asio::io_context::executor_type> strand_;
101 boost::asio::ip::tcp::socket stream_;
102 boost::beast::websocket::stream<boost::asio::ip::tcp::socket&> ws_;
103 boost::beast::multi_buffer rb_;
104
105 bool peerClosed_ = false;
106
107 // synchronize destructor
108 bool b0_ = false;
111
112 // synchronize message queue
116
117 unsigned rpc_version_;
118
119 void
121 {
122 boost::asio::post(
123 ios_, boost::asio::bind_executor(strand_, [this] {
124 if (!peerClosed_)
125 {
126 ws_.async_close(
127 {},
128 boost::asio::bind_executor(strand_, [&](error_code) {
129 try
130 {
131 stream_.cancel();
132 }
133 catch (boost::system::system_error const&)
134 {
135 // ignored
136 }
137 }));
138 }
139 }));
141 thread_.join();
142 }
143
144public:
146 Config const& cfg,
147 bool v2,
148 unsigned rpc_version,
150 : work_(std::in_place, boost::asio::make_work_guard(ios_))
151 , strand_(boost::asio::make_strand(ios_))
152 , thread_([&] { ios_.run(); })
153 , stream_(ios_)
154 , ws_(stream_)
155 , rpc_version_(rpc_version)
156 {
157 try
158 {
159 auto const ep = getEndpoint(cfg, v2);
160 stream_.connect(ep);
161 ws_.set_option(boost::beast::websocket::stream_base::decorator(
162 [&](boost::beast::websocket::request_type& req) {
163 for (auto const& h : headers)
164 req.set(h.first, h.second);
165 }));
166 ws_.handshake(
167 ep.address().to_string() + ":" + std::to_string(ep.port()),
168 "/");
169 ws_.async_read(
170 rb_,
171 boost::asio::bind_executor(
172 strand_,
173 std::bind(
175 this,
176 std::placeholders::_1)));
177 }
178 catch (std::exception&)
179 {
180 cleanup();
181 Rethrow();
182 }
183 }
184
185 ~WSClientImpl() override
186 {
187 cleanup();
188 }
189
191 invoke(std::string const& cmd, Json::Value const& params) override
192 {
193 using boost::asio::buffer;
194 using namespace std::chrono_literals;
195
196 {
197 Json::Value jp;
198 if (params)
199 jp = params;
200 if (rpc_version_ == 2)
201 {
202 jp[jss::method] = cmd;
203 jp[jss::jsonrpc] = "2.0";
204 jp[jss::ripplerpc] = "2.0";
205 jp[jss::id] = 5;
206 }
207 else
208 jp[jss::command] = cmd;
209 auto const s = to_string(jp);
210 ws_.write_some(true, buffer(s));
211 }
212
213 auto jv = findMsg(5s, [&](Json::Value const& jval) {
214 return jval[jss::type] == jss::response;
215 });
216 if (jv)
217 {
218 // Normalize JSON output
219 jv->removeMember(jss::type);
220 if ((*jv).isMember(jss::status) && (*jv)[jss::status] == jss::error)
221 {
222 Json::Value ret;
223 ret[jss::result] = *jv;
224 if ((*jv).isMember(jss::error))
225 ret[jss::error] = (*jv)[jss::error];
226 ret[jss::status] = jss::error;
227 return ret;
228 }
229 if ((*jv).isMember(jss::status) && (*jv).isMember(jss::result))
230 (*jv)[jss::result][jss::status] = (*jv)[jss::status];
231 return *jv;
232 }
233 return {};
234 }
235
237 getMsg(std::chrono::milliseconds const& timeout) override
238 {
240 {
242 if (!cv_.wait_for(lock, timeout, [&] { return !msgs_.empty(); }))
243 return std::nullopt;
244 m = std::move(msgs_.back());
245 msgs_.pop_back();
246 }
247 return std::move(m->jv);
248 }
249
252 std::chrono::milliseconds const& timeout,
253 std::function<bool(Json::Value const&)> pred) override
254 {
256 {
258 if (!cv_.wait_for(lock, timeout, [&] {
259 for (auto it = msgs_.begin(); it != msgs_.end(); ++it)
260 {
261 if (pred((*it)->jv))
262 {
263 m = std::move(*it);
264 msgs_.erase(it);
265 return true;
266 }
267 }
268 return false;
269 }))
270 {
271 return std::nullopt;
272 }
273 }
274 return std::move(m->jv);
275 }
276
277 unsigned
278 version() const override
279 {
280 return rpc_version_;
281 }
282
283private:
284 void
286 {
287 if (ec)
288 {
289 if (ec == boost::beast::websocket::error::closed)
290 peerClosed_ = true;
291 return;
292 }
293
294 Json::Value jv;
295 Json::Reader jr;
296 jr.parse(buffer_string(rb_.data()), jv);
297 rb_.consume(rb_.size());
298 auto m = std::make_shared<msg>(std::move(jv));
299 {
300 std::lock_guard lock(m_);
301 msgs_.push_front(m);
302 cv_.notify_all();
303 }
304 ws_.async_read(
305 rb_,
306 boost::asio::bind_executor(
307 strand_,
308 std::bind(
309 &WSClientImpl::on_read_msg, this, std::placeholders::_1)));
310 }
311
312 // Called when the read op terminates
313 void
315 {
316 std::lock_guard lock(m0_);
317 b0_ = true;
318 cv0_.notify_all();
319 }
320};
321
324 Config const& cfg,
325 bool v2,
326 unsigned rpc_version,
328{
329 return std::make_unique<WSClientImpl>(cfg, v2, rpc_version, headers);
330}
331
332} // namespace test
333} // namespace ripple
T bind(T... args)
Unserialize a JSON document into a Value.
Definition json_reader.h:39
bool parse(std::string const &document, Value &root)
Read a Value from a JSON document.
Represents a JSON value.
Definition json_value.h:149
Holds unparsed configuration information.
bool exists(std::string const &name) const
Returns true if a section with the given name exists.
Section & section(std::string const &name)
Returns the section with the given name.
std::vector< std::string > const & values() const
Returns all the values in the section.
Definition BasicConfig.h:79
boost::asio::io_context ios_
Definition WSClient.cpp:95
unsigned version() const override
Get RPC 1.0 or RPC 2.0.
Definition WSClient.cpp:278
WSClientImpl(Config const &cfg, bool v2, unsigned rpc_version, std::unordered_map< std::string, std::string > const &headers={})
Definition WSClient.cpp:145
boost::system::error_code error_code
Definition WSClient.cpp:43
std::condition_variable cv0_
Definition WSClient.cpp:110
static std::string buffer_string(ConstBuffers const &b)
Definition WSClient.cpp:85
boost::beast::multi_buffer rb_
Definition WSClient.cpp:103
std::optional< Json::Value > getMsg(std::chrono::milliseconds const &timeout) override
Retrieve a message.
Definition WSClient.cpp:237
Json::Value invoke(std::string const &cmd, Json::Value const &params) override
Submit a command synchronously.
Definition WSClient.cpp:191
boost::asio::strand< boost::asio::io_context::executor_type > strand_
Definition WSClient.cpp:99
static boost::asio::ip::tcp::endpoint getEndpoint(BasicConfig const &cfg, bool v2)
Definition WSClient.cpp:55
void on_read_msg(error_code const &ec)
Definition WSClient.cpp:285
boost::beast::websocket::stream< boost::asio::ip::tcp::socket & > ws_
Definition WSClient.cpp:102
std::optional< boost::asio::executor_work_guard< boost::asio::io_context::executor_type > > work_
Definition WSClient.cpp:98
std::list< std::shared_ptr< msg > > msgs_
Definition WSClient.cpp:115
boost::asio::ip::tcp::socket stream_
Definition WSClient.cpp:101
std::condition_variable cv_
Definition WSClient.cpp:114
std::optional< Json::Value > findMsg(std::chrono::milliseconds const &timeout, std::function< bool(Json::Value const &)> pred) override
Retrieve a message that meets the predicate criteria.
Definition WSClient.cpp:251
T count(T... args)
T is_same_v
T join(T... args)
std::unique_ptr< WSClient > makeWSClient(Config const &cfg, bool v2, unsigned rpc_version, std::unordered_map< std::string, std::string > const &headers)
Returns a client operating through WebSockets/S.
Definition WSClient.cpp:323
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:25
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
void parse_Port(ParsedPort &port, Section const &section, std::ostream &log)
Definition Port.cpp:214
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:630
void Rethrow()
Rethrow the exception currently being handled.
Definition contract.h:48
STL namespace.
T push_front(T... args)
T resize(T... args)
T size(T... args)
std::optional< std::uint16_t > port
Definition Port.h:116
std::set< std::string, boost::beast::iless > protocol
Definition Port.h:102
std::optional< boost::asio::ip::address > ip
Definition Port.h:115
T to_string(T... args)