diff --git a/src/xrpld/overlay/detail/PeerImp.cpp b/src/xrpld/overlay/detail/PeerImp.cpp index 69f25e1eb4..c2737993c2 100644 --- a/src/xrpld/overlay/detail/PeerImp.cpp +++ b/src/xrpld/overlay/detail/PeerImp.cpp @@ -82,10 +82,8 @@ PeerImp::PeerImp( , journal_(sink_) , p_journal_(p_sink_) , stream_ptr_(std::move(stream_ptr)) - , socket_(stream_ptr_->next_layer().socket()) - , stream_(*stream_ptr_) - , strand_(boost::asio::make_strand(socket_.get_executor())) - , timer_(waitable_timer{socket_.get_executor()}) + , strand_(boost::asio::make_strand(stream_ptr_->get_executor())) + , timer_(waitable_timer{stream_ptr_->get_executor()}) , remote_address_(slot->remote_endpoint()) , overlay_(overlay) , inbound_(true) @@ -215,7 +213,7 @@ PeerImp::stop() { if (!strand_.running_in_this_thread()) return post(strand_, std::bind(&PeerImp::stop, shared_from_this())); - if (socket_.is_open()) + if (socketOpen()) { // The rationale for using different severity levels is that // outbound connections are under our control and may be logged @@ -288,7 +286,7 @@ PeerImp::send(std::shared_ptr const& m) return; boost::asio::async_write( - stream_, + *stream_ptr_, boost::asio::buffer( send_queue_.front()->getBuffer(compressionEnabled_)), bind_executor( @@ -300,6 +298,12 @@ PeerImp::send(std::shared_ptr const& m) std::placeholders::_2))); } +bool +PeerImp::socketOpen() const +{ + return stream_ptr_->next_layer().socket().is_open(); +} + void PeerImp::sendTxQueue() { @@ -578,13 +582,13 @@ PeerImp::close() XRPL_ASSERT( strand_.running_in_this_thread(), "ripple::PeerImp::close : strand in this thread"); - if (socket_.is_open()) + if (socketOpen()) { detaching_ = true; // DEPRECATED try { timer_.cancel(); - socket_.close(); + stream_ptr_->lowest_layer().close(); } catch (boost::system::system_error const&) { @@ -613,7 +617,7 @@ PeerImp::fail(std::string const& reason) (void(Peer::*)(std::string const&)) & PeerImp::fail, shared_from_this(), reason)); - if (journal_.active(beast::severities::kWarning) && socket_.is_open()) + if (journal_.active(beast::severities::kWarning) && socketOpen()) { std::string const n = name(); JLOG(journal_.warn()) << (n.empty() ? remote_address_.to_string() : n) @@ -628,7 +632,7 @@ PeerImp::fail(std::string const& name, error_code ec) XRPL_ASSERT( strand_.running_in_this_thread(), "ripple::PeerImp::fail : strand in this thread"); - if (socket_.is_open()) + if (socketOpen()) { JLOG(journal_.warn()) << name << " from " << toBase58(TokenType::NodePublic, publicKey_) @@ -644,7 +648,7 @@ PeerImp::gracefulClose() strand_.running_in_this_thread(), "ripple::PeerImp::gracefulClose : strand in this thread"); XRPL_ASSERT( - socket_.is_open(), "ripple::PeerImp::gracefulClose : socket is open"); + socketOpen(), "ripple::PeerImp::gracefulClose : socket is open"); XRPL_ASSERT( !gracefulClose_, "ripple::PeerImp::gracefulClose : socket is not closing"); @@ -652,7 +656,7 @@ PeerImp::gracefulClose() if (send_queue_.size() > 0) return; setTimer(); - stream_.async_shutdown(bind_executor( + stream_ptr_->async_shutdown(bind_executor( strand_, std::bind( &PeerImp::onShutdown, shared_from_this(), std::placeholders::_1))); @@ -703,7 +707,7 @@ PeerImp::makePrefix(id_t id) void PeerImp::onTimer(error_code const& ec) { - if (!socket_.is_open()) + if (!socketOpen()) return; if (ec == boost::asio::error::operation_aborted) @@ -826,14 +830,14 @@ PeerImp::doAccept() // Write the whole buffer and only start protocol when that's done. boost::asio::async_write( - stream_, + *stream_ptr_, write_buffer->data(), boost::asio::transfer_all(), bind_executor( strand_, [this, write_buffer, self = shared_from_this()]( error_code ec, std::size_t bytes_transferred) { - if (!socket_.is_open()) + if (!socketOpen()) return; if (ec == boost::asio::error::operation_aborted) return; @@ -903,7 +907,7 @@ PeerImp::doProtocolStart() void PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred) { - if (!socket_.is_open()) + if (!socketOpen()) return; if (ec == boost::asio::error::operation_aborted) return; @@ -943,7 +947,7 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred) if (ec) return fail("onReadMessage", ec); - if (!socket_.is_open()) + if (!socketOpen()) return; if (gracefulClose_) return; @@ -953,7 +957,7 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred) } // Timeout on writes only - stream_.async_read_some( + stream_ptr_->async_read_some( read_buffer_.prepare(std::max(Tuning::readBufferBytes, hint)), bind_executor( strand_, @@ -967,7 +971,7 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred) void PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred) { - if (!socket_.is_open()) + if (!socketOpen()) return; if (ec == boost::asio::error::operation_aborted) return; @@ -991,7 +995,7 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred) { // Timeout on writes only return boost::asio::async_write( - stream_, + *stream_ptr_, boost::asio::buffer( send_queue_.front()->getBuffer(compressionEnabled_)), bind_executor( @@ -1005,7 +1009,7 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred) if (gracefulClose_) { - return stream_.async_shutdown(bind_executor( + return stream_ptr_->async_shutdown(bind_executor( strand_, std::bind( &PeerImp::onShutdown, diff --git a/src/xrpld/overlay/detail/PeerImp.h b/src/xrpld/overlay/detail/PeerImp.h index 3d9a0c0b1e..f3f82125aa 100644 --- a/src/xrpld/overlay/detail/PeerImp.h +++ b/src/xrpld/overlay/detail/PeerImp.h @@ -60,7 +60,6 @@ public: private: using clock_type = std::chrono::steady_clock; using error_code = boost::system::error_code; - using socket_type = boost::asio::ip::tcp::socket; using middle_type = boost::beast::tcp_stream; using stream_type = boost::beast::ssl_stream; using address_type = boost::asio::ip::address; @@ -76,8 +75,6 @@ private: beast::Journal const journal_; beast::Journal const p_journal_; std::unique_ptr stream_ptr_; - socket_type& socket_; - stream_type& stream_; boost::asio::strand strand_; waitable_timer timer_; @@ -519,6 +516,9 @@ private: handleHaveTransactions( std::shared_ptr const& m); + bool + socketOpen() const; + public: //-------------------------------------------------------------------------- // @@ -667,10 +667,8 @@ PeerImp::PeerImp( , journal_(sink_) , p_journal_(p_sink_) , stream_ptr_(std::move(stream_ptr)) - , socket_(stream_ptr_->next_layer().socket()) - , stream_(*stream_ptr_) - , strand_(boost::asio::make_strand(socket_.get_executor())) - , timer_(waitable_timer{socket_.get_executor()}) + , strand_(boost::asio::make_strand(stream_ptr_->get_executor())) + , timer_(waitable_timer{stream_ptr_->get_executor()}) , remote_address_(slot->remote_endpoint()) , overlay_(overlay) , inbound_(false)