diff --git a/BaseWSPeer_8h_source.html b/BaseWSPeer_8h_source.html index f2514a7c8b..ac143c5e37 100644 --- a/BaseWSPeer_8h_source.html +++ b/BaseWSPeer_8h_source.html @@ -100,558 +100,563 @@ $(function() {
29 #include <boost/beast/core/multi_buffer.hpp>
30 #include <boost/beast/http/message.hpp>
31 #include <boost/beast/websocket.hpp>
-
32 #include <cassert>
-
33 #include <functional>
-
34 
-
35 namespace ripple {
-
36 
-
38 template <class Handler, class Impl>
-
39 class BaseWSPeer : public BasePeer<Handler, Impl>, public WSSession
-
40 {
-
41 protected:
-
42  using clock_type = std::chrono::system_clock;
-
43  using error_code = boost::system::error_code;
-
44  using endpoint_type = boost::asio::ip::tcp::endpoint;
-
45  using waitable_timer = boost::asio::basic_waitable_timer<clock_type>;
-
46  using BasePeer<Handler, Impl>::strand_;
-
47 
-
48 private:
-
49  friend class BasePeer<Handler, Impl>;
-
50 
-
51  http_request_type request_;
-
52  boost::beast::multi_buffer rb_;
-
53  boost::beast::multi_buffer wb_;
-
54  std::list<std::shared_ptr<WSMsg>> wq_;
-
55  bool do_close_ = false;
-
56  boost::beast::websocket::close_reason cr_;
-
57  waitable_timer timer_;
-
58  bool close_on_timer_ = false;
-
59  bool ping_active_ = false;
-
60  boost::beast::websocket::ping_data payload_;
-
61  error_code ec_;
-
62  std::function<
-
63  void(boost::beast::websocket::frame_type, boost::beast::string_view)>
-
64  control_callback_;
-
65 
-
66 public:
-
67  template <class Body, class Headers>
-
68  BaseWSPeer(
-
69  Port const& port,
-
70  Handler& handler,
-
71  boost::asio::executor const& executor,
-
72  waitable_timer timer,
-
73  endpoint_type remote_address,
-
74  boost::beast::http::request<Body, Headers>&& request,
-
75  beast::Journal journal);
-
76 
-
77  void
-
78  run() override;
-
79 
-
80  //
-
81  // WSSession
-
82  //
+
32 
+
33 #include <cassert>
+
34 #include <functional>
+
35 
+
36 namespace ripple {
+
37 
+
39 template <class Handler, class Impl>
+
40 class BaseWSPeer : public BasePeer<Handler, Impl>, public WSSession
+
41 {
+
42 protected:
+
43  using clock_type = std::chrono::system_clock;
+
44  using error_code = boost::system::error_code;
+
45  using endpoint_type = boost::asio::ip::tcp::endpoint;
+
46  using waitable_timer = boost::asio::basic_waitable_timer<clock_type>;
+
47  using BasePeer<Handler, Impl>::strand_;
+
48 
+
49 private:
+
50  friend class BasePeer<Handler, Impl>;
+
51 
+
52  http_request_type request_;
+
53  boost::beast::multi_buffer rb_;
+
54  boost::beast::multi_buffer wb_;
+
55  std::list<std::shared_ptr<WSMsg>> wq_;
+
59  bool do_close_ = false;
+
60  boost::beast::websocket::close_reason cr_;
+
61  waitable_timer timer_;
+
62  bool close_on_timer_ = false;
+
63  bool ping_active_ = false;
+
64  boost::beast::websocket::ping_data payload_;
+
65  error_code ec_;
+
66  std::function<
+
67  void(boost::beast::websocket::frame_type, boost::beast::string_view)>
+
68  control_callback_;
+
69 
+
70 public:
+
71  template <class Body, class Headers>
+
72  BaseWSPeer(
+
73  Port const& port,
+
74  Handler& handler,
+
75  boost::asio::executor const& executor,
+
76  waitable_timer timer,
+
77  endpoint_type remote_address,
+
78  boost::beast::http::request<Body, Headers>&& request,
+
79  beast::Journal journal);
+
80 
+
81  void
+
82  run() override;
83 
-
84  Port const&
-
85  port() const override
-
86  {
-
87  return this->port_;
-
88  }
-
89 
-
90  http_request_type const&
-
91  request() const override
-
92  {
-
93  return this->request_;
-
94  }
-
95 
-
96  boost::asio::ip::tcp::endpoint const&
-
97  remote_endpoint() const override
-
98  {
-
99  return this->remote_address_;
-
100  }
-
101 
-
102  void
-
103  send(std::shared_ptr<WSMsg> w) override;
-
104 
-
105  void
-
106  close() override;
-
107 
-
108  void
-
109  close(boost::beast::websocket::close_reason const& reason) override;
-
110 
-
111  void
-
112  complete() override;
-
113 
-
114 protected:
-
115  Impl&
-
116  impl()
-
117  {
-
118  return *static_cast<Impl*>(this);
-
119  }
-
120 
-
121  void
-
122  on_ws_handshake(error_code const& ec);
-
123 
-
124  void
-
125  do_write();
-
126 
-
127  void
-
128  on_write(error_code const& ec);
-
129 
-
130  void
-
131  on_write_fin(error_code const& ec);
-
132 
-
133  void
-
134  do_read();
-
135 
-
136  void
-
137  on_read(error_code const& ec);
-
138 
-
139  void
-
140  on_close(error_code const& ec);
-
141 
-
142  void
-
143  start_timer();
-
144 
-
145  void
-
146  cancel_timer();
-
147 
-
148  void
-
149  on_ping(error_code const& ec);
-
150 
-
151  void
-
152  on_ping_pong(
-
153  boost::beast::websocket::frame_type kind,
-
154  boost::beast::string_view payload);
-
155 
-
156  void
-
157  on_timer(error_code ec);
-
158 
-
159  template <class String>
+
84  //
+
85  // WSSession
+
86  //
+
87 
+
88  Port const&
+
89  port() const override
+
90  {
+
91  return this->port_;
+
92  }
+
93 
+
94  http_request_type const&
+
95  request() const override
+
96  {
+
97  return this->request_;
+
98  }
+
99 
+
100  boost::asio::ip::tcp::endpoint const&
+
101  remote_endpoint() const override
+
102  {
+
103  return this->remote_address_;
+
104  }
+
105 
+
106  void
+
107  send(std::shared_ptr<WSMsg> w) override;
+
108 
+
109  void
+
110  close() override;
+
111 
+
112  void
+
113  close(boost::beast::websocket::close_reason const& reason) override;
+
114 
+
115  void
+
116  complete() override;
+
117 
+
118 protected:
+
119  Impl&
+
120  impl()
+
121  {
+
122  return *static_cast<Impl*>(this);
+
123  }
+
124 
+
125  void
+
126  on_ws_handshake(error_code const& ec);
+
127 
+
128  void
+
129  do_write();
+
130 
+
131  void
+
132  on_write(error_code const& ec);
+
133 
+
134  void
+
135  on_write_fin(error_code const& ec);
+
136 
+
137  void
+
138  do_read();
+
139 
+
140  void
+
141  on_read(error_code const& ec);
+
142 
+
143  void
+
144  on_close(error_code const& ec);
+
145 
+
146  void
+
147  start_timer();
+
148 
+
149  void
+
150  cancel_timer();
+
151 
+
152  void
+
153  on_ping(error_code const& ec);
+
154 
+
155  void
+
156  on_ping_pong(
+
157  boost::beast::websocket::frame_type kind,
+
158  boost::beast::string_view payload);
+
159 
160  void
-
161  fail(error_code ec, String const& what);
-
162 };
-
163 
-
164 //------------------------------------------------------------------------------
-
165 
-
166 template <class Handler, class Impl>
-
167 template <class Body, class Headers>
-
168 BaseWSPeer<Handler, Impl>::BaseWSPeer(
-
169  Port const& port,
-
170  Handler& handler,
-
171  boost::asio::executor const& executor,
-
172  waitable_timer timer,
-
173  endpoint_type remote_address,
-
174  boost::beast::http::request<Body, Headers>&& request,
-
175  beast::Journal journal)
-
176  : BasePeer<Handler, Impl>(port, handler, executor, remote_address, journal)
-
177  , request_(std::move(request))
-
178  , timer_(std::move(timer))
-
179  , payload_("12345678") // ensures size is 8 bytes
-
180 {
-
181 }
-
182 
-
183 template <class Handler, class Impl>
-
184 void
-
185 BaseWSPeer<Handler, Impl>::run()
-
186 {
-
187  if (!strand_.running_in_this_thread())
-
188  return post(
-
189  strand_, std::bind(&BaseWSPeer::run, impl().shared_from_this()));
-
190  impl().ws_.set_option(port().pmd_options);
-
191  // Must manage the control callback memory outside of the `control_callback`
-
192  // function
-
193  control_callback_ = std::bind(
-
194  &BaseWSPeer::on_ping_pong,
-
195  this,
-
196  std::placeholders::_1,
-
197  std::placeholders::_2);
-
198  impl().ws_.control_callback(control_callback_);
-
199  start_timer();
-
200  close_on_timer_ = true;
-
201  impl().ws_.set_option(
-
202  boost::beast::websocket::stream_base::decorator([](auto& res) {
-
203  res.set(
-
204  boost::beast::http::field::server,
-
205  BuildInfo::getFullVersionString());
-
206  }));
-
207  impl().ws_.async_accept(
-
208  request_,
-
209  bind_executor(
-
210  strand_,
-
211  std::bind(
-
212  &BaseWSPeer::on_ws_handshake,
-
213  impl().shared_from_this(),
-
214  std::placeholders::_1)));
-
215 }
-
216 
-
217 template <class Handler, class Impl>
-
218 void
-
219 BaseWSPeer<Handler, Impl>::send(std::shared_ptr<WSMsg> w)
-
220 {
-
221  if (!strand_.running_in_this_thread())
-
222  return post(
-
223  strand_,
-
224  std::bind(
-
225  &BaseWSPeer::send, impl().shared_from_this(), std::move(w)));
-
226  if (do_close_)
-
227  return;
-
228  if (wq_.size() > port().ws_queue_limit)
-
229  {
-
230  cr_.code = safe_cast<decltype(cr_.code)>(
-
231  boost::beast::websocket::close_code::policy_error);
-
232  cr_.reason = "Policy error: client is too slow.";
-
233  JLOG(this->j_.info()) << cr_.reason;
-
234  wq_.erase(std::next(wq_.begin()), wq_.end());
-
235  close(cr_);
-
236  return;
-
237  }
-
238  wq_.emplace_back(std::move(w));
-
239  if (wq_.size() == 1)
-
240  on_write({});
-
241 }
-
242 
-
243 template <class Handler, class Impl>
-
244 void
-
245 BaseWSPeer<Handler, Impl>::close()
-
246 {
-
247  close(boost::beast::websocket::close_reason{});
-
248 }
-
249 
-
250 template <class Handler, class Impl>
-
251 void
-
252 BaseWSPeer<Handler, Impl>::close(
-
253  boost::beast::websocket::close_reason const& reason)
-
254 {
-
255  if (!strand_.running_in_this_thread())
-
256  return post(strand_, [self = impl().shared_from_this(), reason] {
-
257  self->close(reason);
-
258  });
-
259  do_close_ = true;
-
260  if (wq_.empty())
-
261  {
-
262  impl().ws_.async_close(
-
263  reason,
-
264  bind_executor(
-
265  strand_,
-
266  [self = impl().shared_from_this()](
-
267  boost::beast::error_code const& ec) {
-
268  self->on_close(ec);
-
269  }));
-
270  }
-
271  else
-
272  {
-
273  cr_ = reason;
-
274  }
-
275 }
-
276 
-
277 template <class Handler, class Impl>
-
278 void
-
279 BaseWSPeer<Handler, Impl>::complete()
-
280 {
-
281  if (!strand_.running_in_this_thread())
-
282  return post(
-
283  strand_,
-
284  std::bind(&BaseWSPeer::complete, impl().shared_from_this()));
-
285  do_read();
-
286 }
-
287 
-
288 template <class Handler, class Impl>
-
289 void
-
290 BaseWSPeer<Handler, Impl>::on_ws_handshake(error_code const& ec)
-
291 {
-
292  if (ec)
-
293  return fail(ec, "on_ws_handshake");
-
294  close_on_timer_ = false;
-
295  do_read();
-
296 }
-
297 
-
298 template <class Handler, class Impl>
-
299 void
-
300 BaseWSPeer<Handler, Impl>::do_write()
-
301 {
-
302  if (!strand_.running_in_this_thread())
-
303  return post(
-
304  strand_,
-
305  std::bind(&BaseWSPeer::do_write, impl().shared_from_this()));
-
306  on_write({});
-
307 }
-
308 
-
309 template <class Handler, class Impl>
-
310 void
-
311 BaseWSPeer<Handler, Impl>::on_write(error_code const& ec)
-
312 {
-
313  if (ec)
-
314  return fail(ec, "write");
-
315  auto& w = *wq_.front();
-
316  auto const result = w.prepare(
-
317  65536, std::bind(&BaseWSPeer::do_write, impl().shared_from_this()));
-
318  if (boost::indeterminate(result.first))
-
319  return;
-
320  start_timer();
-
321  if (!result.first)
-
322  impl().ws_.async_write_some(
-
323  static_cast<bool>(result.first),
-
324  result.second,
-
325  bind_executor(
-
326  strand_,
-
327  std::bind(
-
328  &BaseWSPeer::on_write,
-
329  impl().shared_from_this(),
-
330  std::placeholders::_1)));
-
331  else
-
332  impl().ws_.async_write_some(
-
333  static_cast<bool>(result.first),
-
334  result.second,
-
335  bind_executor(
-
336  strand_,
-
337  std::bind(
-
338  &BaseWSPeer::on_write_fin,
-
339  impl().shared_from_this(),
-
340  std::placeholders::_1)));
-
341 }
-
342 
-
343 template <class Handler, class Impl>
-
344 void
-
345 BaseWSPeer<Handler, Impl>::on_write_fin(error_code const& ec)
-
346 {
-
347  if (ec)
-
348  return fail(ec, "write_fin");
-
349  wq_.pop_front();
-
350  if (do_close_)
-
351  impl().ws_.async_close(
-
352  cr_,
-
353  bind_executor(
-
354  strand_,
-
355  std::bind(
-
356  &BaseWSPeer::on_close,
-
357  impl().shared_from_this(),
-
358  std::placeholders::_1)));
-
359  else if (!wq_.empty())
-
360  on_write({});
-
361 }
-
362 
-
363 template <class Handler, class Impl>
-
364 void
-
365 BaseWSPeer<Handler, Impl>::do_read()
-
366 {
-
367  if (!strand_.running_in_this_thread())
-
368  return post(
-
369  strand_,
-
370  std::bind(&BaseWSPeer::do_read, impl().shared_from_this()));
-
371  impl().ws_.async_read(
-
372  rb_,
-
373  bind_executor(
-
374  strand_,
-
375  std::bind(
-
376  &BaseWSPeer::on_read,
-
377  impl().shared_from_this(),
-
378  std::placeholders::_1)));
-
379 }
-
380 
-
381 template <class Handler, class Impl>
-
382 void
-
383 BaseWSPeer<Handler, Impl>::on_read(error_code const& ec)
-
384 {
-
385  if (ec == boost::beast::websocket::error::closed)
-
386  return on_close({});
-
387  if (ec)
-
388  return fail(ec, "read");
-
389  auto const& data = rb_.data();
-
390  std::vector<boost::asio::const_buffer> b;
-
391  b.reserve(std::distance(data.begin(), data.end()));
-
392  std::copy(data.begin(), data.end(), std::back_inserter(b));
-
393  this->handler_.onWSMessage(impl().shared_from_this(), b);
-
394  rb_.consume(rb_.size());
-
395 }
-
396 
-
397 template <class Handler, class Impl>
-
398 void
-
399 BaseWSPeer<Handler, Impl>::on_close(error_code const& ec)
-
400 {
-
401  cancel_timer();
-
402 }
-
403 
-
404 template <class Handler, class Impl>
-
405 void
-
406 BaseWSPeer<Handler, Impl>::start_timer()
-
407 {
-
408  // Max seconds without completing a message
-
409  static constexpr std::chrono::seconds timeout{30};
-
410  static constexpr std::chrono::seconds timeoutLocal{3};
-
411  error_code ec;
-
412  timer_.expires_from_now(
-
413  remote_endpoint().address().is_loopback() ? timeoutLocal : timeout, ec);
-
414  if (ec)
-
415  return fail(ec, "start_timer");
-
416  timer_.async_wait(bind_executor(
-
417  strand_,
-
418  std::bind(
-
419  &BaseWSPeer<Handler, Impl>::on_timer,
-
420  impl().shared_from_this(),
-
421  std::placeholders::_1)));
-
422 }
-
423 
-
424 // Convenience for discarding the error code
-
425 template <class Handler, class Impl>
-
426 void
-
427 BaseWSPeer<Handler, Impl>::cancel_timer()
-
428 {
-
429  error_code ec;
-
430  timer_.cancel(ec);
-
431 }
-
432 
+
161  on_timer(error_code ec);
+
162 
+
163  template <class String>
+
164  void
+
165  fail(error_code ec, String const& what);
+
166 };
+
167 
+
168 //------------------------------------------------------------------------------
+
169 
+
170 template <class Handler, class Impl>
+
171 template <class Body, class Headers>
+
172 BaseWSPeer<Handler, Impl>::BaseWSPeer(
+
173  Port const& port,
+
174  Handler& handler,
+
175  boost::asio::executor const& executor,
+
176  waitable_timer timer,
+
177  endpoint_type remote_address,
+
178  boost::beast::http::request<Body, Headers>&& request,
+
179  beast::Journal journal)
+
180  : BasePeer<Handler, Impl>(port, handler, executor, remote_address, journal)
+
181  , request_(std::move(request))
+
182  , timer_(std::move(timer))
+
183  , payload_("12345678") // ensures size is 8 bytes
+
184 {
+
185 }
+
186 
+
187 template <class Handler, class Impl>
+
188 void
+
189 BaseWSPeer<Handler, Impl>::run()
+
190 {
+
191  if (!strand_.running_in_this_thread())
+
192  return post(
+
193  strand_, std::bind(&BaseWSPeer::run, impl().shared_from_this()));
+
194  impl().ws_.set_option(port().pmd_options);
+
195  // Must manage the control callback memory outside of the `control_callback`
+
196  // function
+
197  control_callback_ = std::bind(
+
198  &BaseWSPeer::on_ping_pong,
+
199  this,
+
200  std::placeholders::_1,
+
201  std::placeholders::_2);
+
202  impl().ws_.control_callback(control_callback_);
+
203  start_timer();
+
204  close_on_timer_ = true;
+
205  impl().ws_.set_option(
+
206  boost::beast::websocket::stream_base::decorator([](auto& res) {
+
207  res.set(
+
208  boost::beast::http::field::server,
+
209  BuildInfo::getFullVersionString());
+
210  }));
+
211  impl().ws_.async_accept(
+
212  request_,
+
213  bind_executor(
+
214  strand_,
+
215  std::bind(
+
216  &BaseWSPeer::on_ws_handshake,
+
217  impl().shared_from_this(),
+
218  std::placeholders::_1)));
+
219 }
+
220 
+
221 template <class Handler, class Impl>
+
222 void
+
223 BaseWSPeer<Handler, Impl>::send(std::shared_ptr<WSMsg> w)
+
224 {
+
225  if (!strand_.running_in_this_thread())
+
226  return post(
+
227  strand_,
+
228  std::bind(
+
229  &BaseWSPeer::send, impl().shared_from_this(), std::move(w)));
+
230  if (do_close_)
+
231  return;
+
232  if (wq_.size() > port().ws_queue_limit)
+
233  {
+
234  cr_.code = safe_cast<decltype(cr_.code)>(
+
235  boost::beast::websocket::close_code::policy_error);
+
236  cr_.reason = "Policy error: client is too slow.";
+
237  JLOG(this->j_.info()) << cr_.reason;
+
238  wq_.erase(std::next(wq_.begin()), wq_.end());
+
239  close(cr_);
+
240  return;
+
241  }
+
242  wq_.emplace_back(std::move(w));
+
243  if (wq_.size() == 1)
+
244  on_write({});
+
245 }
+
246 
+
247 template <class Handler, class Impl>
+
248 void
+
249 BaseWSPeer<Handler, Impl>::close()
+
250 {
+
251  close(boost::beast::websocket::close_reason{});
+
252 }
+
253 
+
254 template <class Handler, class Impl>
+
255 void
+
256 BaseWSPeer<Handler, Impl>::close(
+
257  boost::beast::websocket::close_reason const& reason)
+
258 {
+
259  if (!strand_.running_in_this_thread())
+
260  return post(strand_, [self = impl().shared_from_this(), reason] {
+
261  self->close(reason);
+
262  });
+
263  if (do_close_)
+
264  return;
+
265  do_close_ = true;
+
266  if (wq_.empty())
+
267  {
+
268  impl().ws_.async_close(
+
269  reason,
+
270  bind_executor(
+
271  strand_,
+
272  [self = impl().shared_from_this()](
+
273  boost::beast::error_code const& ec) {
+
274  self->on_close(ec);
+
275  }));
+
276  }
+
277  else
+
278  {
+
279  cr_ = reason;
+
280  }
+
281 }
+
282 
+
283 template <class Handler, class Impl>
+
284 void
+
285 BaseWSPeer<Handler, Impl>::complete()
+
286 {
+
287  if (!strand_.running_in_this_thread())
+
288  return post(
+
289  strand_,
+
290  std::bind(&BaseWSPeer::complete, impl().shared_from_this()));
+
291  do_read();
+
292 }
+
293 
+
294 template <class Handler, class Impl>
+
295 void
+
296 BaseWSPeer<Handler, Impl>::on_ws_handshake(error_code const& ec)
+
297 {
+
298  if (ec)
+
299  return fail(ec, "on_ws_handshake");
+
300  close_on_timer_ = false;
+
301  do_read();
+
302 }
+
303 
+
304 template <class Handler, class Impl>
+
305 void
+
306 BaseWSPeer<Handler, Impl>::do_write()
+
307 {
+
308  if (!strand_.running_in_this_thread())
+
309  return post(
+
310  strand_,
+
311  std::bind(&BaseWSPeer::do_write, impl().shared_from_this()));
+
312  on_write({});
+
313 }
+
314 
+
315 template <class Handler, class Impl>
+
316 void
+
317 BaseWSPeer<Handler, Impl>::on_write(error_code const& ec)
+
318 {
+
319  if (ec)
+
320  return fail(ec, "write");
+
321  auto& w = *wq_.front();
+
322  auto const result = w.prepare(
+
323  65536, std::bind(&BaseWSPeer::do_write, impl().shared_from_this()));
+
324  if (boost::indeterminate(result.first))
+
325  return;
+
326  start_timer();
+
327  if (!result.first)
+
328  impl().ws_.async_write_some(
+
329  static_cast<bool>(result.first),
+
330  result.second,
+
331  bind_executor(
+
332  strand_,
+
333  std::bind(
+
334  &BaseWSPeer::on_write,
+
335  impl().shared_from_this(),
+
336  std::placeholders::_1)));
+
337  else
+
338  impl().ws_.async_write_some(
+
339  static_cast<bool>(result.first),
+
340  result.second,
+
341  bind_executor(
+
342  strand_,
+
343  std::bind(
+
344  &BaseWSPeer::on_write_fin,
+
345  impl().shared_from_this(),
+
346  std::placeholders::_1)));
+
347 }
+
348 
+
349 template <class Handler, class Impl>
+
350 void
+
351 BaseWSPeer<Handler, Impl>::on_write_fin(error_code const& ec)
+
352 {
+
353  if (ec)
+
354  return fail(ec, "write_fin");
+
355  wq_.pop_front();
+
356  if (do_close_)
+
357  {
+
358  impl().ws_.async_close(
+
359  cr_,
+
360  bind_executor(
+
361  strand_,
+
362  std::bind(
+
363  &BaseWSPeer::on_close,
+
364  impl().shared_from_this(),
+
365  std::placeholders::_1)));
+
366  }
+
367  else if (!wq_.empty())
+
368  on_write({});
+
369 }
+
370 
+
371 template <class Handler, class Impl>
+
372 void
+
373 BaseWSPeer<Handler, Impl>::do_read()
+
374 {
+
375  if (!strand_.running_in_this_thread())
+
376  return post(
+
377  strand_,
+
378  std::bind(&BaseWSPeer::do_read, impl().shared_from_this()));
+
379  impl().ws_.async_read(
+
380  rb_,
+
381  bind_executor(
+
382  strand_,
+
383  std::bind(
+
384  &BaseWSPeer::on_read,
+
385  impl().shared_from_this(),
+
386  std::placeholders::_1)));
+
387 }
+
388 
+
389 template <class Handler, class Impl>
+
390 void
+
391 BaseWSPeer<Handler, Impl>::on_read(error_code const& ec)
+
392 {
+
393  if (ec == boost::beast::websocket::error::closed)
+
394  return on_close({});
+
395  if (ec)
+
396  return fail(ec, "read");
+
397  auto const& data = rb_.data();
+
398  std::vector<boost::asio::const_buffer> b;
+
399  b.reserve(std::distance(data.begin(), data.end()));
+
400  std::copy(data.begin(), data.end(), std::back_inserter(b));
+
401  this->handler_.onWSMessage(impl().shared_from_this(), b);
+
402  rb_.consume(rb_.size());
+
403 }
+
404 
+
405 template <class Handler, class Impl>
+
406 void
+
407 BaseWSPeer<Handler, Impl>::on_close(error_code const& ec)
+
408 {
+
409  cancel_timer();
+
410 }
+
411 
+
412 template <class Handler, class Impl>
+
413 void
+
414 BaseWSPeer<Handler, Impl>::start_timer()
+
415 {
+
416  // Max seconds without completing a message
+
417  static constexpr std::chrono::seconds timeout{30};
+
418  static constexpr std::chrono::seconds timeoutLocal{3};
+
419  error_code ec;
+
420  timer_.expires_from_now(
+
421  remote_endpoint().address().is_loopback() ? timeoutLocal : timeout, ec);
+
422  if (ec)
+
423  return fail(ec, "start_timer");
+
424  timer_.async_wait(bind_executor(
+
425  strand_,
+
426  std::bind(
+
427  &BaseWSPeer<Handler, Impl>::on_timer,
+
428  impl().shared_from_this(),
+
429  std::placeholders::_1)));
+
430 }
+
431 
+
432 // Convenience for discarding the error code
433 template <class Handler, class Impl>
434 void
-
435 BaseWSPeer<Handler, Impl>::on_ping(error_code const& ec)
+
435 BaseWSPeer<Handler, Impl>::cancel_timer()
436 {
-
437  if (ec == boost::asio::error::operation_aborted)
-
438  return;
-
439  ping_active_ = false;
-
440  if (!ec)
-
441  return;
-
442  fail(ec, "on_ping");
-
443 }
-
444 
-
445 template <class Handler, class Impl>
-
446 void
-
447 BaseWSPeer<Handler, Impl>::on_ping_pong(
-
448  boost::beast::websocket::frame_type kind,
-
449  boost::beast::string_view payload)
-
450 {
-
451  if (kind == boost::beast::websocket::frame_type::pong)
-
452  {
-
453  boost::beast::string_view p(payload_.begin());
-
454  if (payload == p)
-
455  {
-
456  close_on_timer_ = false;
-
457  JLOG(this->j_.trace()) << "got matching pong";
-
458  }
-
459  else
-
460  {
-
461  JLOG(this->j_.trace()) << "got pong";
-
462  }
-
463  }
-
464 }
-
465 
-
466 template <class Handler, class Impl>
-
467 void
-
468 BaseWSPeer<Handler, Impl>::on_timer(error_code ec)
-
469 {
-
470  if (ec == boost::asio::error::operation_aborted)
-
471  return;
-
472  if (!ec)
-
473  {
-
474  if (!close_on_timer_ || !ping_active_)
-
475  {
-
476  start_timer();
-
477  close_on_timer_ = true;
-
478  ping_active_ = true;
-
479  // cryptographic is probably overkill..
-
480  beast::rngfill(payload_.begin(), payload_.size(), crypto_prng());
-
481  impl().ws_.async_ping(
-
482  payload_,
-
483  bind_executor(
-
484  strand_,
-
485  std::bind(
-
486  &BaseWSPeer::on_ping,
-
487  impl().shared_from_this(),
-
488  std::placeholders::_1)));
-
489  JLOG(this->j_.trace()) << "sent ping";
-
490  return;
-
491  }
-
492  ec = boost::system::errc::make_error_code(
-
493  boost::system::errc::timed_out);
-
494  }
-
495  fail(ec, "timer");
-
496 }
-
497 
-
498 template <class Handler, class Impl>
-
499 template <class String>
-
500 void
-
501 BaseWSPeer<Handler, Impl>::fail(error_code ec, String const& what)
-
502 {
-
503  assert(strand_.running_in_this_thread());
-
504 
-
505  cancel_timer();
-
506  if (!ec_ && ec != boost::asio::error::operation_aborted)
-
507  {
-
508  ec_ = ec;
-
509  JLOG(this->j_.trace()) << what << ": " << ec.message();
-
510  ripple::get_lowest_layer(impl().ws_).socket().close(ec);
-
511  }
-
512 }
-
513 
-
514 } // namespace ripple
-
515 
-
516 #endif
+
437  error_code ec;
+
438  timer_.cancel(ec);
+
439 }
+
440 
+
441 template <class Handler, class Impl>
+
442 void
+
443 BaseWSPeer<Handler, Impl>::on_ping(error_code const& ec)
+
444 {
+
445  if (ec == boost::asio::error::operation_aborted)
+
446  return;
+
447  ping_active_ = false;
+
448  if (!ec)
+
449  return;
+
450  fail(ec, "on_ping");
+
451 }
+
452 
+
453 template <class Handler, class Impl>
+
454 void
+
455 BaseWSPeer<Handler, Impl>::on_ping_pong(
+
456  boost::beast::websocket::frame_type kind,
+
457  boost::beast::string_view payload)
+
458 {
+
459  if (kind == boost::beast::websocket::frame_type::pong)
+
460  {
+
461  boost::beast::string_view p(payload_.begin());
+
462  if (payload == p)
+
463  {
+
464  close_on_timer_ = false;
+
465  JLOG(this->j_.trace()) << "got matching pong";
+
466  }
+
467  else
+
468  {
+
469  JLOG(this->j_.trace()) << "got pong";
+
470  }
+
471  }
+
472 }
+
473 
+
474 template <class Handler, class Impl>
+
475 void
+
476 BaseWSPeer<Handler, Impl>::on_timer(error_code ec)
+
477 {
+
478  if (ec == boost::asio::error::operation_aborted)
+
479  return;
+
480  if (!ec)
+
481  {
+
482  if (!close_on_timer_ || !ping_active_)
+
483  {
+
484  start_timer();
+
485  close_on_timer_ = true;
+
486  ping_active_ = true;
+
487  // cryptographic is probably overkill..
+
488  beast::rngfill(payload_.begin(), payload_.size(), crypto_prng());
+
489  impl().ws_.async_ping(
+
490  payload_,
+
491  bind_executor(
+
492  strand_,
+
493  std::bind(
+
494  &BaseWSPeer::on_ping,
+
495  impl().shared_from_this(),
+
496  std::placeholders::_1)));
+
497  JLOG(this->j_.trace()) << "sent ping";
+
498  return;
+
499  }
+
500  ec = boost::system::errc::make_error_code(
+
501  boost::system::errc::timed_out);
+
502  }
+
503  fail(ec, "timer");
+
504 }
+
505 
+
506 template <class Handler, class Impl>
+
507 template <class String>
+
508 void
+
509 BaseWSPeer<Handler, Impl>::fail(error_code ec, String const& what)
+
510 {
+
511  assert(strand_.running_in_this_thread());
+
512 
+
513  cancel_timer();
+
514  if (!ec_ && ec != boost::asio::error::operation_aborted)
+
515  {
+
516  ec_ = ec;
+
517  JLOG(this->j_.trace()) << what << ": " << ec.message();
+
518  ripple::get_lowest_layer(impl().ws_).socket().close(ec);
+
519  }
+
520 }
+
521 
+
522 } // namespace ripple
+
523 
+
524 #endif
-
ripple::BaseWSPeer::do_read
void do_read()
Definition: BaseWSPeer.h:365
-
ripple::BaseWSPeer::on_ws_handshake
void on_ws_handshake(error_code const &ec)
Definition: BaseWSPeer.h:290
+
ripple::BaseWSPeer::do_read
void do_read()
Definition: BaseWSPeer.h:373
+
ripple::BaseWSPeer::on_ws_handshake
void on_ws_handshake(error_code const &ec)
Definition: BaseWSPeer.h:296
std::chrono::system_clock
-
ripple::BaseWSPeer::close
void close() override
Definition: BaseWSPeer.h:245
-
ripple::BaseWSPeer::on_timer
void on_timer(error_code ec)
Definition: BaseWSPeer.h:468
-
ripple::BaseWSPeer::error_code
boost::system::error_code error_code
Definition: BaseWSPeer.h:43
+
ripple::BaseWSPeer::close
void close() override
Definition: BaseWSPeer.h:249
+
ripple::BaseWSPeer::on_timer
void on_timer(error_code ec)
Definition: BaseWSPeer.h:476
+
ripple::BaseWSPeer::error_code
boost::system::error_code error_code
Definition: BaseWSPeer.h:44
std::bind
T bind(T... args)
-
ripple::BaseWSPeer::close_on_timer_
bool close_on_timer_
Definition: BaseWSPeer.h:58
+
ripple::BaseWSPeer::close_on_timer_
bool close_on_timer_
Definition: BaseWSPeer.h:62
std::shared_ptr
STL class.
std::list
STL class.
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:308
-
ripple::BaseWSPeer::impl
Impl & impl()
Definition: BaseWSPeer.h:116
-
ripple::BaseWSPeer::on_ping_pong
void on_ping_pong(boost::beast::websocket::frame_type kind, boost::beast::string_view payload)
Definition: BaseWSPeer.h:447
+
ripple::BaseWSPeer::impl
Impl & impl()
Definition: BaseWSPeer.h:120
+
ripple::BaseWSPeer::on_ping_pong
void on_ping_pong(boost::beast::websocket::frame_type kind, boost::beast::string_view payload)
Definition: BaseWSPeer.h:455
functional
ripple::BasePeer
Definition: BasePeer.h:37
std::vector::reserve
T reserve(T... args)
std::vector
STL class.
-
ripple::BaseWSPeer::wb_
boost::beast::multi_buffer wb_
Definition: BaseWSPeer.h:53
-
ripple::BaseWSPeer::timer_
waitable_timer timer_
Definition: BaseWSPeer.h:57
+
ripple::BaseWSPeer::wb_
boost::beast::multi_buffer wb_
Definition: BaseWSPeer.h:54
+
ripple::BaseWSPeer::timer_
waitable_timer timer_
Definition: BaseWSPeer.h:61
std::back_inserter
T back_inserter(T... args)
-
ripple::BaseWSPeer::ping_active_
bool ping_active_
Definition: BaseWSPeer.h:59
+
ripple::BaseWSPeer::ping_active_
bool ping_active_
Definition: BaseWSPeer.h:63
std::chrono::seconds
ripple::crypto_prng
csprng_engine & crypto_prng()
The default cryptographically secure PRNG.
Definition: csprng.cpp:99
std::distance
T distance(T... args)
-
ripple::BaseWSPeer::cr_
boost::beast::websocket::close_reason cr_
Definition: BaseWSPeer.h:56
+
ripple::BaseWSPeer::cr_
boost::beast::websocket::close_reason cr_
Definition: BaseWSPeer.h:60
std::function
-
ripple::BaseWSPeer::on_write
void on_write(error_code const &ec)
Definition: BaseWSPeer.h:311
-
ripple::BaseWSPeer::wq_
std::list< std::shared_ptr< WSMsg > > wq_
Definition: BaseWSPeer.h:54
-
ripple::BaseWSPeer::run
void run() override
Definition: BaseWSPeer.h:185
-
ripple::BaseWSPeer::control_callback_
std::function< void(boost::beast::websocket::frame_type, boost::beast::string_view)> control_callback_
Definition: BaseWSPeer.h:64
+
ripple::BaseWSPeer::on_write
void on_write(error_code const &ec)
Definition: BaseWSPeer.h:317
+
ripple::BaseWSPeer::wq_
std::list< std::shared_ptr< WSMsg > > wq_
Definition: BaseWSPeer.h:55
+
ripple::BaseWSPeer::run
void run() override
Definition: BaseWSPeer.h:189
+
ripple::BaseWSPeer::control_callback_
std::function< void(boost::beast::websocket::frame_type, boost::beast::string_view)> control_callback_
Definition: BaseWSPeer.h:68
ripple::BasePeer< Handler, PlainWSPeer< Handler > >::waitable_timer
boost::asio::basic_waitable_timer< clock_type > waitable_timer
Definition: BasePeer.h:43
-
ripple::BaseWSPeer::ec_
error_code ec_
Definition: BaseWSPeer.h:61
-
ripple::BaseWSPeer::start_timer
void start_timer()
Definition: BaseWSPeer.h:406
-
ripple::BaseWSPeer::on_ping
void on_ping(error_code const &ec)
Definition: BaseWSPeer.h:435
-
ripple::BaseWSPeer::do_close_
bool do_close_
Definition: BaseWSPeer.h:55
+
ripple::BaseWSPeer::ec_
error_code ec_
Definition: BaseWSPeer.h:65
+
ripple::BaseWSPeer::start_timer
void start_timer()
Definition: BaseWSPeer.h:414
+
ripple::BaseWSPeer::on_ping
void on_ping(error_code const &ec)
Definition: BaseWSPeer.h:443
+
ripple::BaseWSPeer::do_close_
bool do_close_
The socket has been closed, or will close after the next write finishes.
Definition: BaseWSPeer.h:59
ripple::BasePeer< Handler, PlainWSPeer< Handler > >::error_code
boost::system::error_code error_code
Definition: BasePeer.h:41
-
ripple::BaseWSPeer::send
void send(std::shared_ptr< WSMsg > w) override
Send a WebSockets message.
Definition: BaseWSPeer.h:219
-
ripple::BaseWSPeer::payload_
boost::beast::websocket::ping_data payload_
Definition: BaseWSPeer.h:60
-
ripple::BaseWSPeer::remote_endpoint
boost::asio::ip::tcp::endpoint const & remote_endpoint() const override
Definition: BaseWSPeer.h:97
-
ripple::BaseWSPeer::on_write_fin
void on_write_fin(error_code const &ec)
Definition: BaseWSPeer.h:345
+
ripple::BaseWSPeer::send
void send(std::shared_ptr< WSMsg > w) override
Send a WebSockets message.
Definition: BaseWSPeer.h:223
+
ripple::BaseWSPeer::payload_
boost::beast::websocket::ping_data payload_
Definition: BaseWSPeer.h:64
+
ripple::BaseWSPeer::remote_endpoint
boost::asio::ip::tcp::endpoint const & remote_endpoint() const override
Definition: BaseWSPeer.h:101
+
ripple::BaseWSPeer::on_write_fin
void on_write_fin(error_code const &ec)
Definition: BaseWSPeer.h:351
ripple::safe_cast
constexpr std::enable_if_t< std::is_same_v< typename Dest::unit_type, typename Src::unit_type > &&std::is_integral_v< typename Dest::value_type > &&std::is_integral_v< typename Src::value_type >, Dest > safe_cast(Src s) noexcept
Definition: FeeUnits.h:536
-
ripple::BaseWSPeer::on_close
void on_close(error_code const &ec)
Definition: BaseWSPeer.h:399
+
ripple::BaseWSPeer::on_close
void on_close(error_code const &ec)
Definition: BaseWSPeer.h:407
beast::Journal::info
Stream info() const
Definition: Journal.h:320
ripple::BasePeer::port_
Port const & port_
Definition: BasePeer.h:45
std::copy
T copy(T... args)
-
ripple::BaseWSPeer::do_write
void do_write()
Definition: BaseWSPeer.h:300
+
ripple::BaseWSPeer::do_write
void do_write()
Definition: BaseWSPeer.h:306
ripple::BasePeer< Handler, PlainWSPeer< Handler > >::endpoint_type
boost::asio::ip::tcp::endpoint endpoint_type
Definition: BasePeer.h:42
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
ripple::Port
Configuration information for a Server listening port.
Definition: Port.h:48
-
ripple::BaseWSPeer::complete
void complete() override
Indicate that the response is complete.
Definition: BaseWSPeer.h:279
-
ripple::BaseWSPeer::fail
void fail(error_code ec, String const &what)
Definition: BaseWSPeer.h:501
+
ripple::BaseWSPeer::complete
void complete() override
Indicate that the response is complete.
Definition: BaseWSPeer.h:285
+
ripple::BaseWSPeer::fail
void fail(error_code ec, String const &what)
Definition: BaseWSPeer.h:509
ripple::WSSession
Definition: WSSession.h:107
-
ripple::BaseWSPeer::cancel_timer
void cancel_timer()
Definition: BaseWSPeer.h:427
+
ripple::BaseWSPeer::cancel_timer
void cancel_timer()
Definition: BaseWSPeer.h:435
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
beast::rngfill
void rngfill(void *buffer, std::size_t bytes, Generator &g)
Definition: rngfill.h:33
std
STL namespace.
cassert
-
ripple::BaseWSPeer::request
http_request_type const & request() const override
Definition: BaseWSPeer.h:91
-
ripple::BaseWSPeer::on_read
void on_read(error_code const &ec)
Definition: BaseWSPeer.h:383
+
ripple::BaseWSPeer::request
http_request_type const & request() const override
Definition: BaseWSPeer.h:95
+
ripple::BaseWSPeer::on_read
void on_read(error_code const &ec)
Definition: BaseWSPeer.h:391
ripple::BasePeer::remote_address_
endpoint_type remote_address_
Definition: BasePeer.h:47
-
ripple::BaseWSPeer::rb_
boost::beast::multi_buffer rb_
Definition: BaseWSPeer.h:52
+
ripple::BaseWSPeer::rb_
boost::beast::multi_buffer rb_
Definition: BaseWSPeer.h:53
ripple::get_lowest_layer
decltype(auto) get_lowest_layer(T &t) noexcept
Definition: LowestLayer.h:35
ripple::http_request_type
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
Definition: Handshake.h:47
-
ripple::BaseWSPeer::BaseWSPeer
BaseWSPeer(Port const &port, Handler &handler, boost::asio::executor const &executor, waitable_timer timer, endpoint_type remote_address, boost::beast::http::request< Body, Headers > &&request, beast::Journal journal)
Definition: BaseWSPeer.h:168
-
ripple::BaseWSPeer::request_
http_request_type request_
Definition: BaseWSPeer.h:51
-
ripple::BaseWSPeer::port
Port const & port() const override
Definition: BaseWSPeer.h:85
-
ripple::BaseWSPeer
Represents an active WebSocket connection.
Definition: BaseWSPeer.h:39
+
ripple::BaseWSPeer::BaseWSPeer
BaseWSPeer(Port const &port, Handler &handler, boost::asio::executor const &executor, waitable_timer timer, endpoint_type remote_address, boost::beast::http::request< Body, Headers > &&request, beast::Journal journal)
Definition: BaseWSPeer.h:172
+
ripple::BaseWSPeer::request_
http_request_type request_
Definition: BaseWSPeer.h:52
+
ripple::BaseWSPeer::port
Port const & port() const override
Definition: BaseWSPeer.h:89
+
ripple::BaseWSPeer
Represents an active WebSocket connection.
Definition: BaseWSPeer.h:40
std::next
T next(T... args)