rippled
Loading...
Searching...
No Matches
short_read_test.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4
5 Copyright 2014 Ripple Labs Inc.
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
10
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <test/jtx/envconfig.h>
21
22#include <xrpl/basics/make_SSLContext.h>
23#include <xrpl/beast/core/CurrentThreadName.h>
24#include <xrpl/beast/unit_test.h>
25
26#include <boost/asio.hpp>
27#include <boost/asio/ssl.hpp>
28#include <boost/utility/in_place_factory.hpp>
29
30#include <functional>
31#include <optional>
32#include <thread>
33#include <utility>
34
35namespace ripple {
36/*
37
38Findings from the test:
39
40If the remote host calls async_shutdown then the local host's
41async_read will complete with eof.
42
43If both hosts call async_shutdown then the calls to async_shutdown
44will complete with eof.
45
46*/
47
49{
50private:
51 using io_context_type = boost::asio::io_context;
52 using strand_type = boost::asio::io_context::strand;
53 using timer_type =
54 boost::asio::basic_waitable_timer<std::chrono::steady_clock>;
55 using acceptor_type = boost::asio::ip::tcp::acceptor;
56 using socket_type = boost::asio::ip::tcp::socket;
57 using stream_type = boost::asio::ssl::stream<socket_type&>;
58 using error_code = boost::system::error_code;
59 using endpoint_type = boost::asio::ip::tcp::endpoint;
60 using address_type = boost::asio::ip::address;
61
67
68 template <class Streambuf>
69 static void
70 write(Streambuf& sb, std::string const& s)
71 {
72 using boost::asio::buffer;
73 using boost::asio::buffer_copy;
74 using boost::asio::buffer_size;
75 boost::asio::const_buffers_1 buf(s.data(), s.size());
76 sb.commit(buffer_copy(sb.prepare(buffer_size(buf)), buf));
77 }
78
79 //--------------------------------------------------------------------------
80
81 class Base
82 {
83 protected:
84 class Child
85 {
86 private:
88
89 public:
90 explicit Child(Base& base) : base_(base)
91 {
92 }
93
94 virtual ~Child()
95 {
96 base_.remove(this);
97 }
98
99 virtual void
100 close() = 0;
101 };
102
103 private:
107 bool closed_ = false;
108
109 public:
111 {
112 // Derived class must call wait() in the destructor
113 assert(list_.empty());
114 }
115
116 void
118 {
120 list_.emplace(child.get(), child);
121 }
122
123 void
124 remove(Child* child)
125 {
127 list_.erase(child);
128 if (list_.empty())
130 }
131
132 void
134 {
136 {
138 v.reserve(list_.size());
139 if (closed_)
140 return;
141 closed_ = true;
142 for (auto const& c : list_)
143 {
144 if (auto p = c.second.lock())
145 {
146 p->close();
147 // Must destroy shared_ptr outside the
148 // lock otherwise deadlock from the
149 // managed object's destructor.
150 v.emplace_back(std::move(p));
151 }
152 }
153 }
154 }
155
156 void
158 {
160 while (!list_.empty())
161 cond_.wait(lock);
162 }
163 };
164
165 //--------------------------------------------------------------------------
166
167 class Server : public Base
168 {
169 private:
172
174 {
180
181 explicit Acceptor(Server& server)
182 : Child(server)
183 , server_(server)
185 , acceptor_(
188 beast::IP::Address::from_string(
189 test::getEnvLocalhostAddr()),
190 0))
193 {
194 acceptor_.listen();
195 server_.endpoint_ = acceptor_.local_endpoint();
196 }
197
198 void
199 close() override
200 {
201 if (!strand_.running_in_this_thread())
202 return post(
203 strand_,
205 acceptor_.close();
206 }
207
208 void
210 {
211 acceptor_.async_accept(
212 socket_,
213 bind_executor(
214 strand_,
215 std::bind(
218 std::placeholders::_1)));
219 }
220
221 void
222 fail(std::string const& what, error_code ec)
223 {
224 if (acceptor_.is_open())
225 {
226 if (ec != boost::asio::error::operation_aborted)
227 test_.log << what << ": " << ec.message() << std::endl;
228 acceptor_.close();
229 }
230 }
231
232 void
234 {
235 if (ec)
236 return fail("accept", ec);
237 auto const p =
239 server_.add(p);
240 p->run();
241 acceptor_.async_accept(
242 socket_,
243 bind_executor(
244 strand_,
245 std::bind(
248 std::placeholders::_1)));
249 }
250 };
251
253 {
260 boost::asio::streambuf buf_;
261
262 Connection(Server& server, socket_type&& socket)
263 : Child(server)
264 , server_(server)
266 , socket_(std::move(socket))
270 {
271 }
272
273 void
274 close() override
275 {
276 if (!strand_.running_in_this_thread())
277 return post(
278 strand_,
280 if (socket_.is_open())
281 {
282 socket_.close();
283 timer_.cancel();
284 }
285 }
286
287 void
289 {
290 timer_.expires_from_now(std::chrono::seconds(3));
291 timer_.async_wait(bind_executor(
292 strand_,
293 std::bind(
296 std::placeholders::_1)));
297 stream_.async_handshake(
298 stream_type::server,
299 bind_executor(
300 strand_,
301 std::bind(
304 std::placeholders::_1)));
305 }
306
307 void
308 fail(std::string const& what, error_code ec)
309 {
310 if (socket_.is_open())
311 {
312 if (ec != boost::asio::error::operation_aborted)
313 test_.log << "[server] " << what << ": " << ec.message()
314 << std::endl;
315 socket_.close();
316 timer_.cancel();
317 }
318 }
319
320 void
322 {
323 if (ec == boost::asio::error::operation_aborted)
324 return;
325 if (ec)
326 return fail("timer", ec);
327 test_.log << "[server] timeout" << std::endl;
328 socket_.close();
329 }
330
331 void
333 {
334 if (ec)
335 return fail("handshake", ec);
336#if 1
337 boost::asio::async_read_until(
338 stream_,
339 buf_,
340 "\n",
341 bind_executor(
342 strand_,
343 std::bind(
346 std::placeholders::_1,
347 std::placeholders::_2)));
348#else
349 close();
350#endif
351 }
352
353 void
354 on_read(error_code ec, std::size_t bytes_transferred)
355 {
356 if (ec == boost::asio::error::eof)
357 {
358 server_.test_.log << "[server] read: EOF" << std::endl;
359 return stream_.async_shutdown(bind_executor(
360 strand_,
361 std::bind(
364 std::placeholders::_1)));
365 }
366 if (ec)
367 return fail("read", ec);
368
369 buf_.commit(bytes_transferred);
370 buf_.consume(bytes_transferred);
371 write(buf_, "BYE\n");
372 boost::asio::async_write(
373 stream_,
374 buf_.data(),
375 bind_executor(
376 strand_,
377 std::bind(
380 std::placeholders::_1,
381 std::placeholders::_2)));
382 }
383
384 void
385 on_write(error_code ec, std::size_t bytes_transferred)
386 {
387 buf_.consume(bytes_transferred);
388 if (ec)
389 return fail("write", ec);
390 stream_.async_shutdown(bind_executor(
391 strand_,
392 std::bind(
395 std::placeholders::_1)));
396 }
397
398 void
400 {
401 if (ec)
402 return fail("shutdown", ec);
403 socket_.close();
404 timer_.cancel();
405 }
406 };
407
408 public:
409 explicit Server(short_read_test& test) : test_(test)
410 {
411 auto const p = std::make_shared<Acceptor>(*this);
412 add(p);
413 p->run();
414 }
415
417 {
418 close();
419 wait();
420 }
421
422 endpoint_type const&
423 endpoint() const
424 {
425 return endpoint_;
426 }
427 };
428
429 //--------------------------------------------------------------------------
430 class Client : public Base
431
432 {
433 private:
435
437 {
444 boost::asio::streambuf buf_;
446
447 Connection(Client& client, endpoint_type const& ep)
448 : Child(client)
449 , client_(client)
455 , ep_(ep)
456 {
457 }
458
459 void
460 close() override
461 {
462 if (!strand_.running_in_this_thread())
463 return post(
464 strand_,
466 if (socket_.is_open())
467 {
468 socket_.close();
469 timer_.cancel();
470 }
471 }
472
473 void
475 {
476 timer_.expires_from_now(std::chrono::seconds(3));
477 timer_.async_wait(bind_executor(
478 strand_,
479 std::bind(
482 std::placeholders::_1)));
483 socket_.async_connect(
484 ep,
485 bind_executor(
486 strand_,
487 std::bind(
490 std::placeholders::_1)));
491 }
492
493 void
494 fail(std::string const& what, error_code ec)
495 {
496 if (socket_.is_open())
497 {
498 if (ec != boost::asio::error::operation_aborted)
499 test_.log << "[client] " << what << ": " << ec.message()
500 << std::endl;
501 socket_.close();
502 timer_.cancel();
503 }
504 }
505
506 void
508 {
509 if (ec == boost::asio::error::operation_aborted)
510 return;
511 if (ec)
512 return fail("timer", ec);
513 test_.log << "[client] timeout";
514 socket_.close();
515 }
516
517 void
519 {
520 if (ec)
521 return fail("connect", ec);
522 stream_.async_handshake(
523 stream_type::client,
524 bind_executor(
525 strand_,
526 std::bind(
529 std::placeholders::_1)));
530 }
531
532 void
534 {
535 if (ec)
536 return fail("handshake", ec);
537 write(buf_, "HELLO\n");
538
539#if 1
540 boost::asio::async_write(
541 stream_,
542 buf_.data(),
543 bind_executor(
544 strand_,
545 std::bind(
548 std::placeholders::_1,
549 std::placeholders::_2)));
550#else
551 stream_.async_shutdown(bind_executor(
552 strand_,
553 std::bind(
556 std::placeholders::_1)));
557#endif
558 }
559
560 void
561 on_write(error_code ec, std::size_t bytes_transferred)
562 {
563 buf_.consume(bytes_transferred);
564 if (ec)
565 return fail("write", ec);
566#if 1
567 boost::asio::async_read_until(
568 stream_,
569 buf_,
570 "\n",
571 bind_executor(
572 strand_,
573 std::bind(
576 std::placeholders::_1,
577 std::placeholders::_2)));
578#else
579 stream_.async_shutdown(bind_executor(
580 strand_,
581 std::bind(
584 std::placeholders::_1)));
585#endif
586 }
587
588 void
589 on_read(error_code ec, std::size_t bytes_transferred)
590 {
591 if (ec)
592 return fail("read", ec);
593 buf_.commit(bytes_transferred);
594 stream_.async_shutdown(bind_executor(
595 strand_,
596 std::bind(
599 std::placeholders::_1)));
600 }
601
602 void
604 {
605 if (ec)
606 return fail("shutdown", ec);
607 socket_.close();
608 timer_.cancel();
609 }
610 };
611
612 public:
613 Client(short_read_test& test, endpoint_type const& ep) : test_(test)
614 {
615 auto const p = std::make_shared<Connection>(*this, ep);
616 add(p);
617 p->run(ep);
618 }
619
621 {
622 close();
623 wait();
624 }
625 };
626
627public:
629 : work_(io_context_.get_executor())
630 , thread_(std::thread([this]() {
631 beast::setCurrentThreadName("io_context");
632 this->io_context_.run();
633 }))
635 {
636 }
637
639 {
640 work_.reset();
641 thread_.join();
642 }
643
644 void
645 run() override
646 {
647 Server s(*this);
648 Client c(*this, s.endpoint());
649 c.wait();
650 pass();
651 }
652};
653
654BEAST_DEFINE_TESTSUITE(short_read, overlay, ripple);
655
656} // namespace ripple
T bind(T... args)
A testsuite class.
Definition suite.h:55
log_os< char > log
Logging output stream.
Definition suite.h:152
void pass()
Record a successful test condition.
Definition suite.h:511
friend class thread
Definition suite.h:307
std::map< Child *, std::weak_ptr< Child > > list_
void add(std::shared_ptr< Child > const &child)
std::condition_variable cond_
Client(short_read_test &test, endpoint_type const &ep)
endpoint_type const & endpoint() const
std::shared_ptr< boost::asio::ssl::context > context_
void run() override
Runs the suite.
boost::asio::io_context::strand strand_type
boost::system::error_code error_code
boost::asio::ssl::stream< socket_type & > stream_type
boost::asio::io_context io_context_type
boost::asio::ip::tcp::endpoint endpoint_type
std::optional< boost::asio::executor_work_guard< boost::asio::executor > > work_
boost::asio::basic_waitable_timer< std::chrono::steady_clock > timer_type
boost::asio::ip::tcp::acceptor acceptor_type
static void write(Streambuf &sb, std::string const &s)
boost::asio::ip::address address_type
boost::asio::ip::tcp::socket socket_type
T data(T... args)
T emplace_back(T... args)
T endl(T... args)
T is_same_v
T join(T... args)
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:25
std::shared_ptr< boost::asio::ssl::context > make_SSLContext(std::string const &cipherList)
Create a self-signed SSL context that allows anonymous Diffie Hellman.
bool from_string(RangeSet< T > &rs, std::string const &s)
Convert the given styled string to a RangeSet.
Definition RangeSet.h:124
STL namespace.
T reserve(T... args)
T reset(T... args)
T size(T... args)
void on_read(error_code ec, std::size_t bytes_transferred)
void on_write(error_code ec, std::size_t bytes_transferred)
void fail(std::string const &what, error_code ec)
Connection(Client &client, endpoint_type const &ep)
void fail(std::string const &what, error_code ec)
void fail(std::string const &what, error_code ec)
void on_read(error_code ec, std::size_t bytes_transferred)
void on_write(error_code ec, std::size_t bytes_transferred)
Connection(Server &server, socket_type &&socket)