diff --git a/src/xrpld/overlay/detail/ConnectAttempt.cpp b/src/xrpld/overlay/detail/ConnectAttempt.cpp index 9623cb4e1e..4cdc87e0e1 100644 --- a/src/xrpld/overlay/detail/ConnectAttempt.cpp +++ b/src/xrpld/overlay/detail/ConnectAttempt.cpp @@ -25,6 +25,7 @@ #include #include +#include namespace ripple { @@ -70,11 +71,13 @@ ConnectAttempt::stop() if (!strand_.running_in_this_thread()) return strand_.post( std::bind(&ConnectAttempt::stop, shared_from_this())); - if (socket_.is_open()) - { - JLOG(journal_.debug()) << "Stop"; - } - close(); + + if (!socket_.is_open()) + return; + + JLOG(journal_.debug()) << "Stop"; + + shutdown(); } void @@ -90,6 +93,55 @@ ConnectAttempt::run() //------------------------------------------------------------------------------ +void +ConnectAttempt::shutdown() +{ + XRPL_ASSERT( + strand_.running_in_this_thread(), + "ripple::ConnectAttempt::shutdown: strand in this thread"); + + if (!socket_.is_open() || shutdown_) + return; + + shutdown_ = true; + + setTimer(); + + error_code ec; + // cancel asynchronous I/O + socket_.cancel(ec); + + // gracefully shutdown the SSL socket, performing a shutdown handshake + stream_.async_shutdown(bind_executor( + strand_, + std::bind( + &ConnectAttempt::onShutdown, + shared_from_this(), + std::placeholders::_1))); +} + +void +ConnectAttempt::onShutdown(error_code ec) +{ + cancelTimer(); + + if (ec) + { + // - eof: the stream was cleanly closed + // - operation_aborted: an expired timer (slow shutdown) + // - stream_truncated: the tcp connection closed (no handshake) it could + // occur if a peer does not perform a graceful disconnect + // - broken_pipe: the peer is gone + if (ec != boost::asio::error::eof && + ec != boost::asio::error::operation_aborted) + { + JLOG(journal_.warn()) << "onShutdown: " << ec.message(); + } + } + + close(); +} + void ConnectAttempt::close() { @@ -104,21 +156,21 @@ ConnectAttempt::close() error_code ec; socket_.close(ec); - JLOG(journal_.debug()) << "Closed"; + JLOG(journal_.debug()) << "close: Closed"; } void ConnectAttempt::fail(std::string const& reason) { JLOG(journal_.debug()) << reason; - close(); + shutdown(); } void ConnectAttempt::fail(std::string const& name, error_code ec) { JLOG(journal_.debug()) << name << ": " << ec.message(); - close(); + shutdown(); } void @@ -150,14 +202,17 @@ ConnectAttempt::onTimer(error_code ec) { if (!socket_.is_open()) return; - if (ec == boost::asio::error::operation_aborted) - return; + if (ec) { + if (ec == boost::asio::error::operation_aborted) + return; + // This should never happen JLOG(journal_.error()) << "onTimer: " << ec.message(); return close(); } + fail("Timeout"); } @@ -166,18 +221,26 @@ ConnectAttempt::onConnect(error_code ec) { cancelTimer(); - if (ec == boost::asio::error::operation_aborted) - return; - endpoint_type local_endpoint; - if (!ec) - local_endpoint = socket_.local_endpoint(ec); - if (ec) - return fail("onConnect", ec); if (!socket_.is_open()) return; + + if (ec) + { + if (ec == boost::asio::error::operation_aborted) + return; + + return fail("onConnect", ec); + } + + // check if connection has really been established + socket_.local_endpoint(ec); + if (ec) + return fail("onConnect", ec); + JLOG(journal_.trace()) << "onConnect"; setTimer(); + stream_.set_verify_mode(boost::asio::ssl::verify_none); stream_.async_handshake( boost::asio::ssl::stream_base::client, @@ -191,24 +254,30 @@ void ConnectAttempt::onHandshake(error_code ec) { cancelTimer(); + if (!socket_.is_open()) return; - if (ec == boost::asio::error::operation_aborted) - return; - endpoint_type local_endpoint; - if (!ec) - local_endpoint = socket_.local_endpoint(ec); + + if (ec) + { + if (ec == boost::asio::error::operation_aborted) + return; + + return fail("onHandshake", ec); + } + + endpoint_type local_endpoint = socket_.local_endpoint(ec); if (ec) return fail("onHandshake", ec); - JLOG(journal_.trace()) << "onHandshake"; + // check if we connected to ourselves if (!overlay_.peerFinder().onConnected( slot_, beast::IPAddressConversion::from_asio(local_endpoint))) - return fail("Duplicate connection"); + return fail("Self connection"); auto const sharedValue = makeSharedValue(*stream_ptr_, journal_); if (!sharedValue) - return close(); // makeSharedValue logs + return shutdown(); // makeSharedValue logs req_ = makeRequest( !overlay_.peerFinder().config().peerPrivate, @@ -241,10 +310,15 @@ ConnectAttempt::onWrite(error_code ec) cancelTimer(); if (!socket_.is_open()) return; - if (ec == boost::asio::error::operation_aborted) - return; + if (ec) + { + if (ec == boost::asio::error::operation_aborted) + return; + return fail("onWrite", ec); + } + boost::beast::http::async_read( stream_, read_buf_, @@ -262,34 +336,22 @@ ConnectAttempt::onRead(error_code ec) if (!socket_.is_open()) return; - if (ec == boost::asio::error::operation_aborted) - return; - if (ec == boost::asio::error::eof) - { - JLOG(journal_.info()) << "EOF"; - setTimer(); - return stream_.async_shutdown(strand_.wrap(std::bind( - &ConnectAttempt::onShutdown, - shared_from_this(), - std::placeholders::_1))); - } - if (ec) - return fail("onRead", ec); - processResponse(); -} -void -ConnectAttempt::onShutdown(error_code ec) -{ - cancelTimer(); - if (!ec) + if (ec) { - JLOG(journal_.error()) << "onShutdown: expected error condition"; - return close(); + if (ec == boost::asio::error::eof) + { + JLOG(journal_.debug()) << "EOF"; + return shutdown(); + } + + if (ec == boost::asio::error::operation_aborted) + return; + + return fail("onRead", ec); } - if (ec != boost::asio::error::eof) - return fail("onShutdown", ec); - close(); + + processResponse(); } //-------------------------------------------------------------------------- @@ -338,7 +400,7 @@ ConnectAttempt::processResponse() JLOG(journal_.info()) << "Unable to upgrade to peer protocol: " << response_.result() << " (" << response_.reason() << ")"; - return close(); + return shutdown(); } // Just because our peer selected a particular protocol version doesn't @@ -358,7 +420,7 @@ ConnectAttempt::processResponse() auto const sharedValue = makeSharedValue(*stream_ptr_, journal_); if (!sharedValue) - return close(); // makeSharedValue logs + return shutdown(); // makeSharedValue logs try { @@ -385,7 +447,12 @@ ConnectAttempt::processResponse() auto const result = overlay_.peerFinder().activate( slot_, publicKey, static_cast(member)); if (result != PeerFinder::Result::success) - return fail("Outbound " + std::string(to_string(result))); + { + std::stringstream ss; + ss << "Outbound Connect Attempt " << remote_endpoint_ << " " + << to_string(result); + return fail(ss.str()); + } auto const peer = std::make_shared( app_, diff --git a/src/xrpld/overlay/detail/ConnectAttempt.h b/src/xrpld/overlay/detail/ConnectAttempt.h index c3e07f956a..415c83d278 100644 --- a/src/xrpld/overlay/detail/ConnectAttempt.h +++ b/src/xrpld/overlay/detail/ConnectAttempt.h @@ -59,6 +59,7 @@ private: response_type response_; std::shared_ptr slot_; request_type req_; + bool shutdown_ = false; public: ConnectAttempt( @@ -81,12 +82,6 @@ public: run(); private: - void - close(); - void - fail(std::string const& reason); - void - fail(std::string const& name, error_code ec); void setTimer(); void @@ -102,7 +97,16 @@ private: void onRead(error_code ec); void + fail(std::string const& reason); + void + fail(std::string const& name, error_code ec); + void + shutdown(); + void onShutdown(error_code ec); + void + close(); + void processResponse();