rippled
WSClient.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2016 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/json/json_reader.h>
21 #include <ripple/json/to_string.h>
22 #include <ripple/protocol/jss.h>
23 #include <ripple/server/Port.h>
24 #include <boost/beast/core/multi_buffer.hpp>
25 #include <boost/beast/websocket.hpp>
26 #include <test/jtx.h>
27 #include <test/jtx/WSClient.h>
28 
29 #include <condition_variable>
30 #include <string>
31 #include <unordered_map>
32 
33 #include <iostream>
34 
35 #include <ripple/beast/unit_test.h>
36 
37 namespace ripple {
38 namespace test {
39 
40 class WSClientImpl : public WSClient
41 {
42  using error_code = boost::system::error_code;
43 
44  struct msg
45  {
47 
48  explicit msg(Json::Value&& jv_) : jv(jv_)
49  {
50  }
51  };
52 
53  static boost::asio::ip::tcp::endpoint
54  getEndpoint(BasicConfig const& cfg, bool v2)
55  {
56  auto& log = std::cerr;
57  ParsedPort common;
58  parse_Port(common, cfg["server"], log);
59  auto const ps = v2 ? "ws2" : "ws";
60  for (auto const& name : cfg.section("server").values())
61  {
62  if (!cfg.exists(name))
63  continue;
64  ParsedPort pp;
65  parse_Port(pp, cfg[name], log);
66  if (pp.protocol.count(ps) == 0)
67  continue;
68  using namespace boost::asio::ip;
69  if (pp.ip && pp.ip->is_unspecified())
70  *pp.ip = pp.ip->is_v6() ? address{address_v6::loopback()}
71  : address{address_v4::loopback()};
72  return {*pp.ip, *pp.port};
73  }
74  Throw<std::runtime_error>("Missing WebSocket port");
75  return {}; // Silence compiler control paths return value warning
76  }
77 
78  template <class ConstBuffers>
79  static std::string
80  buffer_string(ConstBuffers const& b)
81  {
82  using boost::asio::buffer;
83  using boost::asio::buffer_size;
84  std::string s;
85  s.resize(buffer_size(b));
86  buffer_copy(buffer(&s[0], s.size()), b);
87  return s;
88  }
89 
90  boost::asio::io_service ios_;
91  boost::optional<boost::asio::io_service::work> work_;
92  boost::asio::io_service::strand strand_;
94  boost::asio::ip::tcp::socket stream_;
95  boost::beast::websocket::stream<boost::asio::ip::tcp::socket&> ws_;
96  boost::beast::multi_buffer rb_;
97 
98  bool peerClosed_ = false;
99 
100  // synchronize destructor
101  bool b0_ = false;
104 
105  // synchronize message queue
109 
110  unsigned rpc_version_;
111 
112  void
114  {
115  ios_.post(strand_.wrap([this] {
116  if (!peerClosed_)
117  {
118  ws_.async_close({}, strand_.wrap([&](error_code ec) {
119  stream_.cancel(ec);
120  }));
121  }
122  }));
123  work_ = boost::none;
124  thread_.join();
125  }
126 
127 public:
129  Config const& cfg,
130  bool v2,
131  unsigned rpc_version,
133  : work_(ios_)
134  , strand_(ios_)
135  , thread_([&] { ios_.run(); })
136  , stream_(ios_)
137  , ws_(stream_)
138  , rpc_version_(rpc_version)
139  {
140  try
141  {
142  auto const ep = getEndpoint(cfg, v2);
143  stream_.connect(ep);
144  ws_.handshake_ex(
145  ep.address().to_string() + ":" + std::to_string(ep.port()),
146  "/",
147  [&](boost::beast::websocket::request_type& req) {
148  for (auto const& h : headers)
149  req.set(h.first, h.second);
150  });
151  ws_.async_read(
152  rb_,
153  strand_.wrap(std::bind(
154  &WSClientImpl::on_read_msg, this, std::placeholders::_1)));
155  }
156  catch (std::exception&)
157  {
158  cleanup();
159  Rethrow();
160  }
161  }
162 
163  ~WSClientImpl() override
164  {
165  cleanup();
166  }
167 
169  invoke(std::string const& cmd, Json::Value const& params) override
170  {
171  using boost::asio::buffer;
172  using namespace std::chrono_literals;
173 
174  {
175  Json::Value jp;
176  if (params)
177  jp = params;
178  if (rpc_version_ == 2)
179  {
180  jp[jss::method] = cmd;
181  jp[jss::jsonrpc] = "2.0";
182  jp[jss::ripplerpc] = "2.0";
183  jp[jss::id] = 5;
184  }
185  else
186  jp[jss::command] = cmd;
187  auto const s = to_string(jp);
188  ws_.write_some(true, buffer(s));
189  }
190 
191  auto jv = findMsg(5s, [&](Json::Value const& jval) {
192  return jval[jss::type] == jss::response;
193  });
194  if (jv)
195  {
196  // Normalize JSON output
197  jv->removeMember(jss::type);
198  if ((*jv).isMember(jss::status) && (*jv)[jss::status] == jss::error)
199  {
200  Json::Value ret;
201  ret[jss::result] = *jv;
202  if ((*jv).isMember(jss::error))
203  ret[jss::error] = (*jv)[jss::error];
204  ret[jss::status] = jss::error;
205  return ret;
206  }
207  if ((*jv).isMember(jss::status) && (*jv).isMember(jss::result))
208  (*jv)[jss::result][jss::status] = (*jv)[jss::status];
209  return *jv;
210  }
211  return {};
212  }
213 
214  boost::optional<Json::Value>
215  getMsg(std::chrono::milliseconds const& timeout) override
216  {
218  {
220  if (!cv_.wait_for(lock, timeout, [&] { return !msgs_.empty(); }))
221  return boost::none;
222  m = std::move(msgs_.back());
223  msgs_.pop_back();
224  }
225  return std::move(m->jv);
226  }
227 
228  boost::optional<Json::Value>
230  std::chrono::milliseconds const& timeout,
231  std::function<bool(Json::Value const&)> pred) override
232  {
234  {
236  if (!cv_.wait_for(lock, timeout, [&] {
237  for (auto it = msgs_.begin(); it != msgs_.end(); ++it)
238  {
239  if (pred((*it)->jv))
240  {
241  m = std::move(*it);
242  msgs_.erase(it);
243  return true;
244  }
245  }
246  return false;
247  }))
248  {
249  return boost::none;
250  }
251  }
252  return std::move(m->jv);
253  }
254 
255  unsigned
256  version() const override
257  {
258  return rpc_version_;
259  }
260 
261 private:
262  void
264  {
265  if (ec)
266  {
267  if (ec == boost::beast::websocket::error::closed)
268  peerClosed_ = true;
269  return;
270  }
271 
272  Json::Value jv;
273  Json::Reader jr;
274  jr.parse(buffer_string(rb_.data()), jv);
275  rb_.consume(rb_.size());
276  auto m = std::make_shared<msg>(std::move(jv));
277  {
278  std::lock_guard lock(m_);
279  msgs_.push_front(m);
280  cv_.notify_all();
281  }
282  ws_.async_read(
283  rb_,
284  strand_.wrap(std::bind(
285  &WSClientImpl::on_read_msg, this, std::placeholders::_1)));
286  }
287 
288  // Called when the read op terminates
289  void
291  {
292  std::lock_guard lock(m0_);
293  b0_ = true;
294  cv0_.notify_all();
295  }
296 };
297 
300  Config const& cfg,
301  bool v2,
302  unsigned rpc_version,
304 {
305  return std::make_unique<WSClientImpl>(cfg, v2, rpc_version, headers);
306 }
307 
308 } // namespace test
309 } // namespace ripple
ripple::test::WSClientImpl::version
unsigned version() const override
Get RPC 1.0 or RPC 2.0.
Definition: WSClient.cpp:256
ripple::test::WSClientImpl::cv_
std::condition_variable cv_
Definition: WSClient.cpp:107
std::string::resize
T resize(T... args)
ripple::test::WSClientImpl::ios_
boost::asio::io_service ios_
Definition: WSClient.cpp:90
std::bind
T bind(T... args)
std::string
STL class.
std::shared_ptr
STL class.
std::exception
STL class.
std::list
STL class.
ripple::test::WSClientImpl::error_code
boost::system::error_code error_code
Definition: WSClient.cpp:42
ripple::test::WSClientImpl::getMsg
boost::optional< Json::Value > getMsg(std::chrono::milliseconds const &timeout) override
Retrieve a message.
Definition: WSClient.cpp:215
std::string::size
T size(T... args)
ripple::test::WSClientImpl::msg::msg
msg(Json::Value &&jv_)
Definition: WSClient.cpp:48
std::chrono::milliseconds
ripple::ParsedPort::ip
boost::optional< boost::asio::ip::address > ip
Definition: Port.h:108
ripple::test::WSClientImpl::on_read_msg
void on_read_msg(error_code const &ec)
Definition: WSClient.cpp:263
std::lock_guard
STL class.
std::cerr
ripple::test::WSClientImpl::stream_
boost::asio::ip::tcp::socket stream_
Definition: WSClient.cpp:94
ripple::test::WSClientImpl::msg::jv
Json::Value jv
Definition: WSClient.cpp:46
ripple::test::WSClientImpl::invoke
Json::Value invoke(std::string const &cmd, Json::Value const &params) override
Submit a command synchronously.
Definition: WSClient.cpp:169
std::function
ripple::to_string
std::string to_string(ListDisposition disposition)
Definition: ValidatorList.cpp:41
ripple::parse_Port
void parse_Port(ParsedPort &port, Section const &section, std::ostream &log)
Definition: Port.cpp:139
Json::Reader
Unserialize a JSON document into a Value.
Definition: json_reader.h:36
ripple::test::WSClientImpl::buffer_string
static std::string buffer_string(ConstBuffers const &b)
Definition: WSClient.cpp:80
ripple::test::WSClientImpl::findMsg
boost::optional< Json::Value > findMsg(std::chrono::milliseconds const &timeout, std::function< bool(Json::Value const &)> pred) override
Retrieve a message that meets the predicate criteria.
Definition: WSClient.cpp:229
ripple::ParsedPort
Definition: Port.h:90
iostream
ripple::test::WSClientImpl::b0_
bool b0_
Definition: WSClient.cpp:101
ripple::test::WSClientImpl::ws_
boost::beast::websocket::stream< boost::asio::ip::tcp::socket & > ws_
Definition: WSClient.cpp:95
ripple::test::WSClientImpl::m0_
std::mutex m0_
Definition: WSClient.cpp:102
ripple::Section::values
std::vector< std::string > const & values() const
Returns all the values in the section.
Definition: BasicConfig.h:76
ripple::test::WSClientImpl
Definition: WSClient.cpp:40
ripple::test::WSClientImpl::on_read_done
void on_read_done()
Definition: WSClient.cpp:290
std::thread
STL class.
ripple::Config
Definition: Config.h:66
ripple::test::WSClientImpl::~WSClientImpl
~WSClientImpl() override
Definition: WSClient.cpp:163
ripple::Rethrow
void Rethrow()
Rethrow the exception currently being handled.
Definition: contract.h:48
std::unique_lock
STL class.
std::to_string
T to_string(T... args)
ripple::test::WSClientImpl::msgs_
std::list< std::shared_ptr< msg > > msgs_
Definition: WSClient.cpp:108
ripple::test::WSClientImpl::peerClosed_
bool peerClosed_
Definition: WSClient.cpp:98
std::condition_variable::wait_for
T wait_for(T... args)
ripple::test::WSClient
Definition: WSClient.h:32
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::ParsedPort::port
boost::optional< std::uint16_t > port
Definition: Port.h:109
ripple::test::WSClientImpl::rpc_version_
unsigned rpc_version_
Definition: WSClient.cpp:110
Json::Reader::parse
bool parse(std::string const &document, Value &root)
Read a Value from a JSON document.
Definition: json_reader.cpp:73
condition_variable
std::set::count
T count(T... args)
ripple::test::makeWSClient
std::unique_ptr< WSClient > makeWSClient(Config const &cfg, bool v2, unsigned rpc_version, std::unordered_map< std::string, std::string > const &headers)
Returns a client operating through WebSockets/S.
Definition: WSClient.cpp:299
ripple::test::WSClientImpl::msg
Definition: WSClient.cpp:44
ripple::test::WSClientImpl::m_
std::mutex m_
Definition: WSClient.cpp:106
std::mutex
STL class.
ripple::test::WSClientImpl::rb_
boost::beast::multi_buffer rb_
Definition: WSClient.cpp:96
ripple::test::WSClientImpl::thread_
std::thread thread_
Definition: WSClient.cpp:93
ripple::ParsedPort::protocol
std::set< std::string, boost::beast::iless > protocol
Definition: Port.h:95
ripple::test::WSClientImpl::cv0_
std::condition_variable cv0_
Definition: WSClient.cpp:103
std::unique_ptr
STL class.
ripple::test::WSClientImpl::strand_
boost::asio::io_service::strand strand_
Definition: WSClient.cpp:92
ripple::test::WSClientImpl::WSClientImpl
WSClientImpl(Config const &cfg, bool v2, unsigned rpc_version, std::unordered_map< std::string, std::string > const &headers={})
Definition: WSClient.cpp:128
unordered_map
ripple::test::WSClientImpl::work_
boost::optional< boost::asio::io_service::work > work_
Definition: WSClient.cpp:91
ripple::test::WSClientImpl::cleanup
void cleanup()
Definition: WSClient.cpp:113
std::condition_variable::notify_all
T notify_all(T... args)
ripple::BasicConfig
Holds unparsed configuration information.
Definition: BasicConfig.h:178
std::thread::join
T join(T... args)
ripple::test::WSClientImpl::getEndpoint
static boost::asio::ip::tcp::endpoint getEndpoint(BasicConfig const &cfg, bool v2)
Definition: WSClient.cpp:54
ripple::BasicConfig::exists
bool exists(std::string const &name) const
Returns true if a section with the given name exists.
Definition: BasicConfig.cpp:132
Json::Value
Represents a JSON value.
Definition: json_value.h:145
ripple::BasicConfig::section
Section & section(std::string const &name)
Returns the section with the given name.
Definition: BasicConfig.cpp:138
string