rippled
BaseWSPeer.h
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright(c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #ifndef RIPPLE_SERVER_BASEWSPEER_H_INCLUDED
21 #define RIPPLE_SERVER_BASEWSPEER_H_INCLUDED
22 
23 #include <ripple/basics/safe_cast.h>
24 #include <ripple/beast/utility/rngfill.h>
25 #include <ripple/crypto/csprng.h>
26 #include <ripple/protocol/BuildInfo.h>
27 #include <ripple/server/impl/BasePeer.h>
28 #include <ripple/server/impl/LowestLayer.h>
29 #include <boost/beast/core/multi_buffer.hpp>
30 #include <boost/beast/http/message.hpp>
31 #include <boost/beast/websocket.hpp>
32 
33 #include <cassert>
34 #include <functional>
35 
36 namespace ripple {
37 
39 template <class Handler, class Impl>
40 class BaseWSPeer : public BasePeer<Handler, Impl>, public WSSession
41 {
42 protected:
44  using error_code = boost::system::error_code;
45  using endpoint_type = boost::asio::ip::tcp::endpoint;
46  using waitable_timer = boost::asio::basic_waitable_timer<clock_type>;
48 
49 private:
50  friend class BasePeer<Handler, Impl>;
51 
53  boost::beast::multi_buffer rb_;
54  boost::beast::multi_buffer wb_;
59  bool do_close_ = false;
60  boost::beast::websocket::close_reason cr_;
62  bool close_on_timer_ = false;
63  bool ping_active_ = false;
64  boost::beast::websocket::ping_data payload_;
67  void(boost::beast::websocket::frame_type, boost::beast::string_view)>
69 
70 public:
71  template <class Body, class Headers>
72  BaseWSPeer(
73  Port const& port,
74  Handler& handler,
75  boost::asio::executor const& executor,
76  waitable_timer timer,
77  endpoint_type remote_address,
78  boost::beast::http::request<Body, Headers>&& request,
79  beast::Journal journal);
80 
81  void
82  run() override;
83 
84  //
85  // WSSession
86  //
87 
88  Port const&
89  port() const override
90  {
91  return this->port_;
92  }
93 
94  http_request_type const&
95  request() const override
96  {
97  return this->request_;
98  }
99 
100  boost::asio::ip::tcp::endpoint const&
101  remote_endpoint() const override
102  {
103  return this->remote_address_;
104  }
105 
106  void
107  send(std::shared_ptr<WSMsg> w) override;
108 
109  void
110  close() override;
111 
112  void
113  close(boost::beast::websocket::close_reason const& reason) override;
114 
115  void
116  complete() override;
117 
118 protected:
119  Impl&
121  {
122  return *static_cast<Impl*>(this);
123  }
124 
125  void
126  on_ws_handshake(error_code const& ec);
127 
128  void
129  do_write();
130 
131  void
132  on_write(error_code const& ec);
133 
134  void
135  on_write_fin(error_code const& ec);
136 
137  void
138  do_read();
139 
140  void
141  on_read(error_code const& ec);
142 
143  void
144  on_close(error_code const& ec);
145 
146  void
147  start_timer();
148 
149  void
150  cancel_timer();
151 
152  void
153  on_ping(error_code const& ec);
154 
155  void
156  on_ping_pong(
157  boost::beast::websocket::frame_type kind,
158  boost::beast::string_view payload);
159 
160  void
161  on_timer(error_code ec);
162 
163  template <class String>
164  void
165  fail(error_code ec, String const& what);
166 };
167 
168 //------------------------------------------------------------------------------
169 
170 template <class Handler, class Impl>
171 template <class Body, class Headers>
173  Port const& port,
174  Handler& handler,
175  boost::asio::executor const& executor,
176  waitable_timer timer,
177  endpoint_type remote_address,
178  boost::beast::http::request<Body, Headers>&& request,
179  beast::Journal journal)
180  : BasePeer<Handler, Impl>(port, handler, executor, remote_address, journal)
181  , request_(std::move(request))
182  , timer_(std::move(timer))
183  , payload_("12345678") // ensures size is 8 bytes
184 {
185 }
186 
187 template <class Handler, class Impl>
188 void
190 {
191  if (!strand_.running_in_this_thread())
192  return post(
193  strand_, std::bind(&BaseWSPeer::run, impl().shared_from_this()));
194  impl().ws_.set_option(port().pmd_options);
195  // Must manage the control callback memory outside of the `control_callback`
196  // function
197  control_callback_ = std::bind(
198  &BaseWSPeer::on_ping_pong,
199  this,
200  std::placeholders::_1,
201  std::placeholders::_2);
202  impl().ws_.control_callback(control_callback_);
203  start_timer();
204  close_on_timer_ = true;
205  impl().ws_.set_option(
206  boost::beast::websocket::stream_base::decorator([](auto& res) {
207  res.set(
208  boost::beast::http::field::server,
209  BuildInfo::getFullVersionString());
210  }));
211  impl().ws_.async_accept(
212  request_,
213  bind_executor(
214  strand_,
215  std::bind(
216  &BaseWSPeer::on_ws_handshake,
217  impl().shared_from_this(),
218  std::placeholders::_1)));
219 }
220 
221 template <class Handler, class Impl>
222 void
224 {
225  if (!strand_.running_in_this_thread())
226  return post(
227  strand_,
228  std::bind(
229  &BaseWSPeer::send, impl().shared_from_this(), std::move(w)));
230  if (do_close_)
231  return;
232  if (wq_.size() > port().ws_queue_limit)
233  {
234  cr_.code = safe_cast<decltype(cr_.code)>(
235  boost::beast::websocket::close_code::policy_error);
236  cr_.reason = "Policy error: client is too slow.";
237  JLOG(this->j_.info()) << cr_.reason;
238  wq_.erase(std::next(wq_.begin()), wq_.end());
239  close(cr_);
240  return;
241  }
242  wq_.emplace_back(std::move(w));
243  if (wq_.size() == 1)
244  on_write({});
245 }
246 
247 template <class Handler, class Impl>
248 void
250 {
251  close(boost::beast::websocket::close_reason{});
252 }
253 
254 template <class Handler, class Impl>
255 void
257  boost::beast::websocket::close_reason const& reason)
258 {
259  if (!strand_.running_in_this_thread())
260  return post(strand_, [self = impl().shared_from_this(), reason] {
261  self->close(reason);
262  });
263  if (do_close_)
264  return;
265  do_close_ = true;
266  if (wq_.empty())
267  {
268  impl().ws_.async_close(
269  reason,
270  bind_executor(
271  strand_,
272  [self = impl().shared_from_this()](
273  boost::beast::error_code const& ec) {
274  self->on_close(ec);
275  }));
276  }
277  else
278  {
279  cr_ = reason;
280  }
281 }
282 
283 template <class Handler, class Impl>
284 void
286 {
287  if (!strand_.running_in_this_thread())
288  return post(
289  strand_,
290  std::bind(&BaseWSPeer::complete, impl().shared_from_this()));
291  do_read();
292 }
293 
294 template <class Handler, class Impl>
295 void
297 {
298  if (ec)
299  return fail(ec, "on_ws_handshake");
300  close_on_timer_ = false;
301  do_read();
302 }
303 
304 template <class Handler, class Impl>
305 void
307 {
308  if (!strand_.running_in_this_thread())
309  return post(
310  strand_,
311  std::bind(&BaseWSPeer::do_write, impl().shared_from_this()));
312  on_write({});
313 }
314 
315 template <class Handler, class Impl>
316 void
318 {
319  if (ec)
320  return fail(ec, "write");
321  auto& w = *wq_.front();
322  auto const result = w.prepare(
323  65536, std::bind(&BaseWSPeer::do_write, impl().shared_from_this()));
324  if (boost::indeterminate(result.first))
325  return;
326  start_timer();
327  if (!result.first)
328  impl().ws_.async_write_some(
329  static_cast<bool>(result.first),
330  result.second,
331  bind_executor(
332  strand_,
333  std::bind(
334  &BaseWSPeer::on_write,
335  impl().shared_from_this(),
336  std::placeholders::_1)));
337  else
338  impl().ws_.async_write_some(
339  static_cast<bool>(result.first),
340  result.second,
341  bind_executor(
342  strand_,
343  std::bind(
344  &BaseWSPeer::on_write_fin,
345  impl().shared_from_this(),
346  std::placeholders::_1)));
347 }
348 
349 template <class Handler, class Impl>
350 void
352 {
353  if (ec)
354  return fail(ec, "write_fin");
355  wq_.pop_front();
356  if (do_close_)
357  {
358  impl().ws_.async_close(
359  cr_,
360  bind_executor(
361  strand_,
362  std::bind(
363  &BaseWSPeer::on_close,
364  impl().shared_from_this(),
365  std::placeholders::_1)));
366  }
367  else if (!wq_.empty())
368  on_write({});
369 }
370 
371 template <class Handler, class Impl>
372 void
374 {
375  if (!strand_.running_in_this_thread())
376  return post(
377  strand_,
378  std::bind(&BaseWSPeer::do_read, impl().shared_from_this()));
379  impl().ws_.async_read(
380  rb_,
381  bind_executor(
382  strand_,
383  std::bind(
384  &BaseWSPeer::on_read,
385  impl().shared_from_this(),
386  std::placeholders::_1)));
387 }
388 
389 template <class Handler, class Impl>
390 void
392 {
393  if (ec == boost::beast::websocket::error::closed)
394  return on_close({});
395  if (ec)
396  return fail(ec, "read");
397  auto const& data = rb_.data();
399  b.reserve(std::distance(data.begin(), data.end()));
400  std::copy(data.begin(), data.end(), std::back_inserter(b));
401  this->handler_.onWSMessage(impl().shared_from_this(), b);
402  rb_.consume(rb_.size());
403 }
404 
405 template <class Handler, class Impl>
406 void
408 {
409  cancel_timer();
410 }
411 
412 template <class Handler, class Impl>
413 void
415 {
416  // Max seconds without completing a message
417  static constexpr std::chrono::seconds timeout{30};
418  static constexpr std::chrono::seconds timeoutLocal{3};
419  error_code ec;
420  timer_.expires_from_now(
421  remote_endpoint().address().is_loopback() ? timeoutLocal : timeout, ec);
422  if (ec)
423  return fail(ec, "start_timer");
424  timer_.async_wait(bind_executor(
425  strand_,
426  std::bind(
428  impl().shared_from_this(),
429  std::placeholders::_1)));
430 }
431 
432 // Convenience for discarding the error code
433 template <class Handler, class Impl>
434 void
436 {
437  error_code ec;
438  timer_.cancel(ec);
439 }
440 
441 template <class Handler, class Impl>
442 void
444 {
445  if (ec == boost::asio::error::operation_aborted)
446  return;
447  ping_active_ = false;
448  if (!ec)
449  return;
450  fail(ec, "on_ping");
451 }
452 
453 template <class Handler, class Impl>
454 void
456  boost::beast::websocket::frame_type kind,
457  boost::beast::string_view payload)
458 {
459  if (kind == boost::beast::websocket::frame_type::pong)
460  {
461  boost::beast::string_view p(payload_.begin());
462  if (payload == p)
463  {
464  close_on_timer_ = false;
465  JLOG(this->j_.trace()) << "got matching pong";
466  }
467  else
468  {
469  JLOG(this->j_.trace()) << "got pong";
470  }
471  }
472 }
473 
474 template <class Handler, class Impl>
475 void
477 {
478  if (ec == boost::asio::error::operation_aborted)
479  return;
480  if (!ec)
481  {
482  if (!close_on_timer_ || !ping_active_)
483  {
484  start_timer();
485  close_on_timer_ = true;
486  ping_active_ = true;
487  // cryptographic is probably overkill..
488  beast::rngfill(payload_.begin(), payload_.size(), crypto_prng());
489  impl().ws_.async_ping(
490  payload_,
491  bind_executor(
492  strand_,
493  std::bind(
494  &BaseWSPeer::on_ping,
495  impl().shared_from_this(),
496  std::placeholders::_1)));
497  JLOG(this->j_.trace()) << "sent ping";
498  return;
499  }
500  ec = boost::system::errc::make_error_code(
501  boost::system::errc::timed_out);
502  }
503  fail(ec, "timer");
504 }
505 
506 template <class Handler, class Impl>
507 template <class String>
508 void
510 {
511  assert(strand_.running_in_this_thread());
512 
513  cancel_timer();
514  if (!ec_ && ec != boost::asio::error::operation_aborted)
515  {
516  ec_ = ec;
517  JLOG(this->j_.trace()) << what << ": " << ec.message();
518  ripple::get_lowest_layer(impl().ws_).socket().close(ec);
519  }
520 }
521 
522 } // namespace ripple
523 
524 #endif
ripple::BaseWSPeer::do_read
void do_read()
Definition: BaseWSPeer.h:373
ripple::BaseWSPeer::on_ws_handshake
void on_ws_handshake(error_code const &ec)
Definition: BaseWSPeer.h:296
std::chrono::system_clock
ripple::BaseWSPeer::close
void close() override
Definition: BaseWSPeer.h:249
ripple::BaseWSPeer::on_timer
void on_timer(error_code ec)
Definition: BaseWSPeer.h:476
ripple::BaseWSPeer::error_code
boost::system::error_code error_code
Definition: BaseWSPeer.h:44
std::bind
T bind(T... args)
ripple::BaseWSPeer::close_on_timer_
bool close_on_timer_
Definition: BaseWSPeer.h:62
std::shared_ptr
STL class.
std::list
STL class.
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:308
ripple::BaseWSPeer::impl
Impl & impl()
Definition: BaseWSPeer.h:120
ripple::BaseWSPeer::on_ping_pong
void on_ping_pong(boost::beast::websocket::frame_type kind, boost::beast::string_view payload)
Definition: BaseWSPeer.h:455
functional
ripple::BasePeer
Definition: BasePeer.h:37
std::vector::reserve
T reserve(T... args)
std::vector
STL class.
ripple::BaseWSPeer::wb_
boost::beast::multi_buffer wb_
Definition: BaseWSPeer.h:54
ripple::BaseWSPeer::timer_
waitable_timer timer_
Definition: BaseWSPeer.h:61
std::back_inserter
T back_inserter(T... args)
ripple::BaseWSPeer::ping_active_
bool ping_active_
Definition: BaseWSPeer.h:63
std::chrono::seconds
ripple::crypto_prng
csprng_engine & crypto_prng()
The default cryptographically secure PRNG.
Definition: csprng.cpp:99
std::distance
T distance(T... args)
ripple::BaseWSPeer::cr_
boost::beast::websocket::close_reason cr_
Definition: BaseWSPeer.h:60
std::function
ripple::BaseWSPeer::on_write
void on_write(error_code const &ec)
Definition: BaseWSPeer.h:317
ripple::BaseWSPeer::wq_
std::list< std::shared_ptr< WSMsg > > wq_
Definition: BaseWSPeer.h:55
ripple::BaseWSPeer::run
void run() override
Definition: BaseWSPeer.h:189
ripple::BaseWSPeer::control_callback_
std::function< void(boost::beast::websocket::frame_type, boost::beast::string_view)> control_callback_
Definition: BaseWSPeer.h:68
ripple::BasePeer< Handler, PlainWSPeer< Handler > >::waitable_timer
boost::asio::basic_waitable_timer< clock_type > waitable_timer
Definition: BasePeer.h:43
ripple::BaseWSPeer::ec_
error_code ec_
Definition: BaseWSPeer.h:65
ripple::BaseWSPeer::start_timer
void start_timer()
Definition: BaseWSPeer.h:414
ripple::BaseWSPeer::on_ping
void on_ping(error_code const &ec)
Definition: BaseWSPeer.h:443
ripple::BaseWSPeer::do_close_
bool do_close_
The socket has been closed, or will close after the next write finishes.
Definition: BaseWSPeer.h:59
ripple::BasePeer< Handler, PlainWSPeer< Handler > >::error_code
boost::system::error_code error_code
Definition: BasePeer.h:41
ripple::BaseWSPeer::send
void send(std::shared_ptr< WSMsg > w) override
Send a WebSockets message.
Definition: BaseWSPeer.h:223
ripple::BaseWSPeer::payload_
boost::beast::websocket::ping_data payload_
Definition: BaseWSPeer.h:64
ripple::BaseWSPeer::remote_endpoint
boost::asio::ip::tcp::endpoint const & remote_endpoint() const override
Definition: BaseWSPeer.h:101
ripple::BaseWSPeer::on_write_fin
void on_write_fin(error_code const &ec)
Definition: BaseWSPeer.h:351
ripple::safe_cast
constexpr std::enable_if_t< std::is_same_v< typename Dest::unit_type, typename Src::unit_type > &&std::is_integral_v< typename Dest::value_type > &&std::is_integral_v< typename Src::value_type >, Dest > safe_cast(Src s) noexcept
Definition: FeeUnits.h:536
ripple::BaseWSPeer::on_close
void on_close(error_code const &ec)
Definition: BaseWSPeer.h:407
beast::Journal::info
Stream info() const
Definition: Journal.h:320
ripple::BasePeer::port_
Port const & port_
Definition: BasePeer.h:45
std::copy
T copy(T... args)
ripple::BaseWSPeer::do_write
void do_write()
Definition: BaseWSPeer.h:306
ripple::BasePeer< Handler, PlainWSPeer< Handler > >::endpoint_type
boost::asio::ip::tcp::endpoint endpoint_type
Definition: BasePeer.h:42
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
ripple::Port
Configuration information for a Server listening port.
Definition: Port.h:48
ripple::BaseWSPeer::complete
void complete() override
Indicate that the response is complete.
Definition: BaseWSPeer.h:285
ripple::BaseWSPeer::fail
void fail(error_code ec, String const &what)
Definition: BaseWSPeer.h:509
ripple::WSSession
Definition: WSSession.h:107
ripple::BaseWSPeer::cancel_timer
void cancel_timer()
Definition: BaseWSPeer.h:435
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
beast::rngfill
void rngfill(void *buffer, std::size_t bytes, Generator &g)
Definition: rngfill.h:33
std
STL namespace.
cassert
ripple::BaseWSPeer::request
http_request_type const & request() const override
Definition: BaseWSPeer.h:95
ripple::BaseWSPeer::on_read
void on_read(error_code const &ec)
Definition: BaseWSPeer.h:391
ripple::BasePeer::remote_address_
endpoint_type remote_address_
Definition: BasePeer.h:47
ripple::BaseWSPeer::rb_
boost::beast::multi_buffer rb_
Definition: BaseWSPeer.h:53
ripple::get_lowest_layer
decltype(auto) get_lowest_layer(T &t) noexcept
Definition: LowestLayer.h:35
ripple::http_request_type
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
Definition: Handshake.h:47
ripple::BaseWSPeer::BaseWSPeer
BaseWSPeer(Port const &port, Handler &handler, boost::asio::executor const &executor, waitable_timer timer, endpoint_type remote_address, boost::beast::http::request< Body, Headers > &&request, beast::Journal journal)
Definition: BaseWSPeer.h:172
ripple::BaseWSPeer::request_
http_request_type request_
Definition: BaseWSPeer.h:52
ripple::BaseWSPeer::port
Port const & port() const override
Definition: BaseWSPeer.h:89
ripple::BaseWSPeer
Represents an active WebSocket connection.
Definition: BaseWSPeer.h:40
std::next
T next(T... args)