diff --git a/src/xrpld/overlay/detail/ConnectAttempt.cpp b/src/xrpld/overlay/detail/ConnectAttempt.cpp index 61049579c5..71a511a9f6 100644 --- a/src/xrpld/overlay/detail/ConnectAttempt.cpp +++ b/src/xrpld/overlay/detail/ConnectAttempt.cpp @@ -24,6 +24,8 @@ #include +#include + namespace ripple { ConnectAttempt::ConnectAttempt( @@ -94,13 +96,16 @@ ConnectAttempt::close() XRPL_ASSERT( strand_.running_in_this_thread(), "ripple::ConnectAttempt::close : strand in this thread"); - if (socket_.is_open()) - { - error_code ec; - timer_.cancel(ec); - socket_.close(ec); - JLOG(journal_.debug()) << "Closed"; - } + if (!socket_.is_open()) + return; + + cancelTimer(); + + error_code ec; + stream_.shutdown(ec); + socket_.close(ec); + + JLOG(journal_.debug()) << "Closed"; } void @@ -120,13 +125,15 @@ ConnectAttempt::fail(std::string const& name, error_code ec) void ConnectAttempt::setTimer() { - error_code ec; - timer_.expires_from_now(std::chrono::seconds(15), ec); - if (ec) + try { - JLOG(journal_.error()) << "setTimer: " << ec.message(); - return; + timer_.expires_after(std::chrono::seconds(15)); } + catch (std::exception const& ex) + { + JLOG(journal_.error()) << "setTimer: " << ex.what(); + return; + }; timer_.async_wait(strand_.wrap(std::bind( &ConnectAttempt::onTimer, shared_from_this(), std::placeholders::_1))); diff --git a/src/xrpld/overlay/detail/PeerImp.cpp b/src/xrpld/overlay/detail/PeerImp.cpp index 23b4760488..23cf2e655e 100644 --- a/src/xrpld/overlay/detail/PeerImp.cpp +++ b/src/xrpld/overlay/detail/PeerImp.cpp @@ -221,7 +221,6 @@ PeerImp::stop() // outbound connections are under our control and may be logged // at a higher level, but inbound connections are more numerous and // uncontrolled so to prevent log flooding the severity is reduced. - // if (inbound_) { JLOG(journal_.debug()) << "Stop"; @@ -243,8 +242,6 @@ PeerImp::send(std::shared_ptr const& m) return post(strand_, std::bind(&PeerImp::send, shared_from_this(), m)); if (gracefulClose_) return; - if (detaching_) - return; auto validator = m->getValidatorKey(); if (validator && !squelch_.expireSquelch(*validator)) @@ -578,21 +575,29 @@ PeerImp::close() XRPL_ASSERT( strand_.running_in_this_thread(), "ripple::PeerImp::close : strand in this thread"); - if (socket_.is_open()) + + // the socket is closed, this may due to concurrent calls to close() + if (!socket_.is_open()) + return; + + // gracefully shutdown the SSL socket, performing a shutdown handshake + error_code ec; + timer_.cancel(ec); + stream_.shutdown(ec); + socket_.close(ec); + overlay_.incPeerDisconnect(); + + // The rationale for using different severity levels is that + // outbound connections are under our control and may be logged + // at a higher level, but inbound connections are more numerous and + // uncontrolled so to prevent log flooding the severity is reduced. + if (inbound_) { - detaching_ = true; // DEPRECATED - error_code ec; - timer_.cancel(ec); - socket_.close(ec); - overlay_.incPeerDisconnect(); - if (inbound_) - { - JLOG(journal_.debug()) << "Closed"; - } - else - { - JLOG(journal_.info()) << "Closed"; - } + JLOG(journal_.debug()) << "Closed"; + } + else + { + JLOG(journal_.info()) << "Closed"; } } @@ -606,12 +611,14 @@ PeerImp::fail(std::string const& reason) (void(Peer::*)(std::string const&)) & PeerImp::fail, shared_from_this(), reason)); - if (journal_.active(beast::severities::kWarning) && socket_.is_open()) - { - std::string const n = name(); - JLOG(journal_.warn()) << (n.empty() ? remote_address_.to_string() : n) - << " failed: " << reason; - } + + if (!socket_.is_open()) + return; + + std::string const n = name(); + JLOG(journal_.warn()) << (n.empty() ? remote_address_.to_string() : n) + << " failed: " << reason; + close(); } @@ -621,12 +628,15 @@ PeerImp::fail(std::string const& name, error_code ec) XRPL_ASSERT( strand_.running_in_this_thread(), "ripple::PeerImp::fail : strand in this thread"); - if (socket_.is_open()) - { - JLOG(journal_.warn()) - << name << " from " << toBase58(TokenType::NodePublic, publicKey_) - << " at " << remote_address_.to_string() << ": " << ec.message(); - } + + if (!socket_.is_open()) + return; + + JLOG(journal_.warn()) << name << " from " + << toBase58(TokenType::NodePublic, publicKey_) + << " at " << remote_address_.to_string() << ": " + << ec.message(); + close(); } @@ -641,41 +651,35 @@ PeerImp::gracefulClose() XRPL_ASSERT( !gracefulClose_, "ripple::PeerImp::gracefulClose : socket is not closing"); + gracefulClose_ = true; + if (send_queue_.size() > 0) return; - setTimer(); - stream_.async_shutdown(bind_executor( - strand_, - std::bind( - &PeerImp::onShutdown, shared_from_this(), std::placeholders::_1))); + + close(); } void PeerImp::setTimer() { - error_code ec; - timer_.expires_from_now(peerTimerInterval, ec); - - if (ec) + try { - JLOG(journal_.error()) << "setTimer: " << ec.message(); - return; + timer_.expires_after(peerTimerInterval); } + catch (std::exception const& ex) + { + JLOG(journal_.error()) + << "setTimer: error expiring the timer: " << ex.what(); + return close(); + }; + timer_.async_wait(bind_executor( strand_, std::bind( &PeerImp::onTimer, shared_from_this(), std::placeholders::_1))); } -// convenience for ignoring the error code -void -PeerImp::cancelTimer() -{ - error_code ec; - timer_.cancel(ec); -} - //------------------------------------------------------------------------------ std::string @@ -703,10 +707,7 @@ PeerImp::onTimer(error_code const& ec) } if (large_sendq_++ >= Tuning::sendqIntervals) - { - fail("Large send queue"); - return; - } + return fail("Large send queue"); if (auto const t = tracking_.load(); !inbound_ && t != Tracking::converged) { @@ -723,17 +724,13 @@ PeerImp::onTimer(error_code const& ec) (duration > app_.config().MAX_UNKNOWN_TIME))) { overlay_.peerFinder().on_failure(slot_); - fail("Not useful"); - return; + return fail("Not useful"); } } // Already waiting for PONG if (lastPingSeq_) - { - fail("Ping Timeout"); - return; - } + return fail("Ping Timeout"); lastPingTime_ = clock_type::now(); lastPingSeq_ = rand_int(); @@ -747,21 +744,6 @@ PeerImp::onTimer(error_code const& ec) setTimer(); } -void -PeerImp::onShutdown(error_code ec) -{ - cancelTimer(); - // If we don't get eof then something went wrong - if (!ec) - { - JLOG(journal_.error()) << "onShutdown: expected error condition"; - return close(); - } - if (ec != boost::asio::error::eof) - return fail("onShutdown", ec); - close(); -} - //------------------------------------------------------------------------------ void PeerImp::doAccept() @@ -898,8 +880,10 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred) JLOG(journal_.info()) << "EOF"; return gracefulClose(); } + if (ec) return fail("onReadMessage", ec); + if (auto stream = journal_.trace()) { if (bytes_transferred > 0) @@ -927,12 +911,12 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred) 350ms, journal_); - if (ec) - return fail("onReadMessage", ec); if (!socket_.is_open()) return; if (gracefulClose_) return; + if (ec) + return fail("onReadMessage", ec); if (bytes_consumed == 0) break; read_buffer_.consume(bytes_consumed); @@ -959,6 +943,7 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred) return; if (ec) return fail("onWriteMessage", ec); + if (auto stream = journal_.trace()) { if (bytes_transferred > 0) @@ -989,15 +974,11 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred) std::placeholders::_2))); } + // Graceful close was initiated, but it was postponed to allow the server to + // finish sending messages to the peer. If we hit this code, that means all + // messages were sent, finish closing the connection if (gracefulClose_) - { - return stream_.async_shutdown(bind_executor( - strand_, - std::bind( - &PeerImp::onShutdown, - shared_from_this(), - std::placeholders::_1))); - } + close(); } //------------------------------------------------------------------------------ diff --git a/src/xrpld/overlay/detail/PeerImp.h b/src/xrpld/overlay/detail/PeerImp.h index 5aa49fd152..05b032d67c 100644 --- a/src/xrpld/overlay/detail/PeerImp.h +++ b/src/xrpld/overlay/detail/PeerImp.h @@ -95,7 +95,6 @@ private: std::atomic tracking_; clock_type::time_point trackingTime_; - bool detaching_ = false; // Node public key of peer. PublicKey const publicKey_; std::string name_; @@ -425,9 +424,6 @@ public: bool isHighLatency() const override; - void - fail(std::string const& reason); - bool compressionEnabled() const override { @@ -441,21 +437,63 @@ public: } private: - void - close(); - + /** @brief Closes the connection and logs the reason for failure. + * + * @param name A string identifying the operation or context of the failure. + * @param ec The `error_code` associated with the failure. + * @note This operation is idempotent; calling it on an already closed + * socket has no effect. + * @note Must be called on the peer's strand. + */ void fail(std::string const& name, error_code ec); + /** @brief Closes the connection and logs a descriptive reason. + * + * @param reason A human-readable string explaining why the peer connection + * is being terminated. + * @note This operation is idempotent; calling it on an already closed + * socket has no effect. + * @note Must be called on the peer's strand. + */ + void + fail(std::string const& reason); + + /** @brief Forcibly terminates the peer connection and performs cleanup. + * + * This function terminates the peer connection. It performs graceful SSL + * shutdown, closes the underlying network socket and cancels pending + * timers. + * + * @note This operation is idempotent; it's safe to call on a connection + * that is already closed. + * @note Must be called on the peer's strand. + */ + void + close(); + + /** @brief Initiates a graceful shutdown of the peer connection. + * + * This function marks the connection for closure. A "graceful" close + * ensures that any messages already queued for sending are transmitted + * before the underlying socket is closed. The connection may still be + * terminated forcefully if the remote server stopped reading the messages. + * + * + * If the send queue is empty, the connection is closed immediately. If + * messages are still pending, the actual socket closure is deferred until + * the send queue is drained by the I/O processing logic. + * + * @note This operation is idempotent; calling it on a connection that is + * already closed or in the process of closing has no effect. + * @note Must be called on the peer's strand. + */ void gracefulClose(); void setTimer(); - void - cancelTimer(); - static std::string makePrefix(id_t id); @@ -463,10 +501,6 @@ private: void onTimer(boost::system::error_code const& ec); - // Called when SSL shutdown completes - void - onShutdown(error_code ec); - void doAccept();