From 1643d22103eb4ef4f7ee058b3697eaa899b8713b Mon Sep 17 00:00:00 2001 From: Vito Tumas <5780819+Tapanito@users.noreply.github.com> Date: Wed, 8 Oct 2025 20:16:47 +0200 Subject: [PATCH] Revert "Bugfix: Adds graceful peer disconnection (#5669)" (#5855) This reverts commit 17a2606591cf4f45b03b128630895ebef66b42bd. --- src/xrpld/overlay/detail/ConnectAttempt.cpp | 474 ++++++-------------- src/xrpld/overlay/detail/ConnectAttempt.h | 208 +-------- src/xrpld/overlay/detail/PeerImp.cpp | 399 +++++++--------- src/xrpld/overlay/detail/PeerImp.h | 208 +-------- 4 files changed, 329 insertions(+), 960 deletions(-) diff --git a/src/xrpld/overlay/detail/ConnectAttempt.cpp b/src/xrpld/overlay/detail/ConnectAttempt.cpp index c1bc4bb069..397ac06ba6 100644 --- a/src/xrpld/overlay/detail/ConnectAttempt.cpp +++ b/src/xrpld/overlay/detail/ConnectAttempt.cpp @@ -24,8 +24,6 @@ #include -#include - namespace ripple { ConnectAttempt::ConnectAttempt( @@ -47,7 +45,6 @@ ConnectAttempt::ConnectAttempt( , usage_(usage) , strand_(boost::asio::make_strand(io_context)) , timer_(io_context) - , stepTimer_(io_context) , stream_ptr_(std::make_unique( socket_type(std::forward(io_context)), *context)) @@ -55,14 +52,14 @@ ConnectAttempt::ConnectAttempt( , stream_(*stream_ptr_) , slot_(slot) { + JLOG(journal_.debug()) << "Connect " << remote_endpoint; } ConnectAttempt::~ConnectAttempt() { - // slot_ will be null if we successfully connected - // and transferred ownership to a PeerImp if (slot_ != nullptr) overlay_.peerFinder().on_closed(slot_); + JLOG(journal_.trace()) << "~ConnectAttempt"; } void @@ -71,29 +68,16 @@ ConnectAttempt::stop() if (!strand_.running_in_this_thread()) return boost::asio::post( strand_, std::bind(&ConnectAttempt::stop, shared_from_this())); - - if (!socket_.is_open()) - return; - - JLOG(journal_.debug()) << "stop: Stop"; - - shutdown(); + if (socket_.is_open()) + { + JLOG(journal_.debug()) << "Stop"; + } + close(); } void ConnectAttempt::run() { - if (!strand_.running_in_this_thread()) - return boost::asio::post( - strand_, std::bind(&ConnectAttempt::run, shared_from_this())); - - JLOG(journal_.debug()) << "run: connecting to " << remote_endpoint_; - - ioPending_ = true; - - // Allow up to connectTimeout_ seconds to establish remote peer connection - setTimer(ConnectionStep::TcpConnect); - stream_.next_layer().async_connect( remote_endpoint_, boost::asio::bind_executor( @@ -106,177 +90,61 @@ ConnectAttempt::run() //------------------------------------------------------------------------------ -void -ConnectAttempt::shutdown() -{ - XRPL_ASSERT( - strand_.running_in_this_thread(), - "ripple::ConnectAttempt::shutdown: strand in this thread"); - - if (!socket_.is_open()) - return; - - shutdown_ = true; - boost::beast::get_lowest_layer(stream_).cancel(); - - tryAsyncShutdown(); -} - -void -ConnectAttempt::tryAsyncShutdown() -{ - XRPL_ASSERT( - strand_.running_in_this_thread(), - "ripple::ConnectAttempt::tryAsyncShutdown : strand in this thread"); - - if (!shutdown_ || currentStep_ == ConnectionStep::ShutdownStarted) - return; - - if (ioPending_) - return; - - // gracefully shutdown the SSL socket, performing a shutdown handshake - if (currentStep_ != ConnectionStep::TcpConnect && - currentStep_ != ConnectionStep::TlsHandshake) - { - setTimer(ConnectionStep::ShutdownStarted); - return stream_.async_shutdown(bind_executor( - strand_, - std::bind( - &ConnectAttempt::onShutdown, - shared_from_this(), - std::placeholders::_1))); - } - - close(); -} - -void -ConnectAttempt::onShutdown(error_code ec) -{ - cancelTimer(); - - if (ec) - { - // - eof: the stream was cleanly closed - // - operation_aborted: an expired timer (slow shutdown) - // - stream_truncated: the tcp connection closed (no handshake) it could - // occur if a peer does not perform a graceful disconnect - // - broken_pipe: the peer is gone - // - application data after close notify: benign SSL shutdown condition - bool shouldLog = - (ec != boost::asio::error::eof && - ec != boost::asio::error::operation_aborted && - ec.message().find("application data after close notify") == - std::string::npos); - - if (shouldLog) - { - JLOG(journal_.debug()) << "onShutdown: " << ec.message(); - } - } - - close(); -} - void ConnectAttempt::close() { XRPL_ASSERT( strand_.running_in_this_thread(), "ripple::ConnectAttempt::close : strand in this thread"); - if (!socket_.is_open()) - return; + if (socket_.is_open()) + { + try + { + timer_.cancel(); + socket_.close(); + } + catch (boost::system::system_error const&) + { + // ignored + } - cancelTimer(); - - error_code ec; - socket_.close(ec); + JLOG(journal_.debug()) << "Closed"; + } } void ConnectAttempt::fail(std::string const& reason) { JLOG(journal_.debug()) << reason; - shutdown(); + close(); } void ConnectAttempt::fail(std::string const& name, error_code ec) { JLOG(journal_.debug()) << name << ": " << ec.message(); - shutdown(); + close(); } void -ConnectAttempt::setTimer(ConnectionStep step) +ConnectAttempt::setTimer() { - currentStep_ = step; - - // Set global timer (only if not already set) - if (timer_.expiry() == std::chrono::steady_clock::time_point{}) - { - try - { - timer_.expires_after(connectTimeout); - timer_.async_wait(boost::asio::bind_executor( - strand_, - std::bind( - &ConnectAttempt::onTimer, - shared_from_this(), - std::placeholders::_1))); - } - catch (std::exception const& ex) - { - JLOG(journal_.error()) << "setTimer (global): " << ex.what(); - return close(); - } - } - - // Set step-specific timer try { - std::chrono::seconds stepTimeout; - switch (step) - { - case ConnectionStep::TcpConnect: - stepTimeout = StepTimeouts::tcpConnect; - break; - case ConnectionStep::TlsHandshake: - stepTimeout = StepTimeouts::tlsHandshake; - break; - case ConnectionStep::HttpWrite: - stepTimeout = StepTimeouts::httpWrite; - break; - case ConnectionStep::HttpRead: - stepTimeout = StepTimeouts::httpRead; - break; - case ConnectionStep::ShutdownStarted: - stepTimeout = StepTimeouts::tlsShutdown; - break; - case ConnectionStep::Complete: - case ConnectionStep::Init: - return; // No timer needed for init or complete step - } - - // call to expires_after cancels previous timer - stepTimer_.expires_after(stepTimeout); - stepTimer_.async_wait(boost::asio::bind_executor( - strand_, - std::bind( - &ConnectAttempt::onTimer, - shared_from_this(), - std::placeholders::_1))); - - JLOG(journal_.trace()) << "setTimer: " << stepToString(step) - << " timeout=" << stepTimeout.count() << "s"; + timer_.expires_after(std::chrono::seconds(15)); } - catch (std::exception const& ex) + catch (boost::system::system_error const& e) { - JLOG(journal_.error()) - << "setTimer (step " << stepToString(step) << "): " << ex.what(); - return close(); + JLOG(journal_.error()) << "setTimer: " << e.code(); + return; } + + timer_.async_wait(boost::asio::bind_executor( + strand_, + std::bind( + &ConnectAttempt::onTimer, + shared_from_this(), + std::placeholders::_1))); } void @@ -285,7 +153,6 @@ ConnectAttempt::cancelTimer() try { timer_.cancel(); - stepTimer_.cancel(); } catch (boost::system::system_error const&) { @@ -298,69 +165,34 @@ ConnectAttempt::onTimer(error_code ec) { if (!socket_.is_open()) return; - + if (ec == boost::asio::error::operation_aborted) + return; if (ec) { - // do not initiate shutdown, timers are frequently cancelled - if (ec == boost::asio::error::operation_aborted) - return; - // This should never happen JLOG(journal_.error()) << "onTimer: " << ec.message(); return close(); } - - // Determine which timer expired by checking their expiry times - auto const now = std::chrono::steady_clock::now(); - bool globalExpired = (timer_.expiry() <= now); - bool stepExpired = (stepTimer_.expiry() <= now); - - if (globalExpired) - { - JLOG(journal_.debug()) - << "onTimer: Global timeout; step: " << stepToString(currentStep_); - } - else if (stepExpired) - { - JLOG(journal_.debug()) - << "onTimer: Step timeout; step: " << stepToString(currentStep_); - } - else - { - JLOG(journal_.warn()) << "onTimer: Unexpected timer callback"; - } - - close(); + fail("Timeout"); } void ConnectAttempt::onConnect(error_code ec) { - ioPending_ = false; + cancelTimer(); + if (ec == boost::asio::error::operation_aborted) + return; + endpoint_type local_endpoint; + if (!ec) + local_endpoint = socket_.local_endpoint(ec); if (ec) - { - if (ec == boost::asio::error::operation_aborted) - return tryAsyncShutdown(); - return fail("onConnect", ec); - } - if (!socket_.is_open()) return; + JLOG(journal_.trace()) << "onConnect"; - // check if connection has really been established - socket_.local_endpoint(ec); - if (ec) - return fail("onConnect", ec); - - if (shutdown_) - return tryAsyncShutdown(); - - ioPending_ = true; - - setTimer(ConnectionStep::TlsHandshake); - + setTimer(); stream_.set_verify_mode(boost::asio::ssl::verify_none); stream_.async_handshake( boost::asio::ssl::stream_base::client, @@ -375,30 +207,25 @@ ConnectAttempt::onConnect(error_code ec) void ConnectAttempt::onHandshake(error_code ec) { - ioPending_ = false; - - if (ec) - { - if (ec == boost::asio::error::operation_aborted) - return tryAsyncShutdown(); - - return fail("onHandshake", ec); - } - - auto const local_endpoint = socket_.local_endpoint(ec); + cancelTimer(); + if (!socket_.is_open()) + return; + if (ec == boost::asio::error::operation_aborted) + return; + endpoint_type local_endpoint; + if (!ec) + local_endpoint = socket_.local_endpoint(ec); if (ec) return fail("onHandshake", ec); + JLOG(journal_.trace()) << "onHandshake"; - setTimer(ConnectionStep::HttpWrite); - - // check if we connected to ourselves if (!overlay_.peerFinder().onConnected( slot_, beast::IPAddressConversion::from_asio(local_endpoint))) - return fail("Self connection"); + return fail("Duplicate connection"); auto const sharedValue = makeSharedValue(*stream_ptr_, journal_); if (!sharedValue) - return shutdown(); // makeSharedValue logs + return close(); // makeSharedValue logs req_ = makeRequest( !overlay_.peerFinder().config().peerPrivate, @@ -415,11 +242,7 @@ ConnectAttempt::onHandshake(error_code ec) remote_endpoint_.address(), app_); - if (shutdown_) - return tryAsyncShutdown(); - - ioPending_ = true; - + setTimer(); boost::beast::http::async_write( stream_, req_, @@ -434,23 +257,13 @@ ConnectAttempt::onHandshake(error_code ec) void ConnectAttempt::onWrite(error_code ec) { - ioPending_ = false; - + cancelTimer(); + if (!socket_.is_open()) + return; + if (ec == boost::asio::error::operation_aborted) + return; if (ec) - { - if (ec == boost::asio::error::operation_aborted) - return tryAsyncShutdown(); - return fail("onWrite", ec); - } - - if (shutdown_) - return tryAsyncShutdown(); - - ioPending_ = true; - - setTimer(ConnectionStep::HttpRead); - boost::beast::http::async_read( stream_, read_buf_, @@ -467,97 +280,88 @@ void ConnectAttempt::onRead(error_code ec) { cancelTimer(); - ioPending_ = false; - currentStep_ = ConnectionStep::Complete; - if (ec) + if (!socket_.is_open()) + return; + if (ec == boost::asio::error::operation_aborted) + return; + if (ec == boost::asio::error::eof) { - if (ec == boost::asio::error::eof) - { - JLOG(journal_.debug()) << "EOF"; - return shutdown(); - } - - if (ec == boost::asio::error::operation_aborted) - return tryAsyncShutdown(); - - return fail("onRead", ec); + JLOG(journal_.info()) << "EOF"; + setTimer(); + return stream_.async_shutdown(boost::asio::bind_executor( + strand_, + std::bind( + &ConnectAttempt::onShutdown, + shared_from_this(), + std::placeholders::_1))); } - - if (shutdown_) - return tryAsyncShutdown(); - + if (ec) + return fail("onRead", ec); processResponse(); } +void +ConnectAttempt::onShutdown(error_code ec) +{ + cancelTimer(); + if (!ec) + { + JLOG(journal_.error()) << "onShutdown: expected error condition"; + return close(); + } + if (ec != boost::asio::error::eof) + return fail("onShutdown", ec); + close(); +} + //-------------------------------------------------------------------------- void ConnectAttempt::processResponse() { - if (!OverlayImpl::isPeerUpgrade(response_)) + if (response_.result() == boost::beast::http::status::service_unavailable) { - // A peer may respond with service_unavailable and a list of alternative - // peers to connect to, a differing status code is unexpected - if (response_.result() != - boost::beast::http::status::service_unavailable) - { - JLOG(journal_.warn()) - << "Unable to upgrade to peer protocol: " << response_.result() - << " (" << response_.reason() << ")"; - return shutdown(); - } - - // Parse response body to determine if this is a redirect or other - // service unavailable - std::string responseBody; - responseBody.reserve(boost::asio::buffer_size(response_.body().data())); + Json::Value json; + Json::Reader r; + std::string s; + s.reserve(boost::asio::buffer_size(response_.body().data())); for (auto const buffer : response_.body().data()) - responseBody.append( + s.append( static_cast(buffer.data()), boost::asio::buffer_size(buffer)); - - Json::Value json; - Json::Reader reader; - auto const isValidJson = reader.parse(responseBody, json); - - // Check if this is a redirect response (contains peer-ips field) - auto const isRedirect = - isValidJson && json.isObject() && json.isMember("peer-ips"); - - if (!isRedirect) + auto const success = r.parse(s, json); + if (success) { - JLOG(journal_.warn()) - << "processResponse: " << remote_endpoint_ - << " failed to upgrade to peer protocol: " << response_.result() - << " (" << response_.reason() << ")"; - - return shutdown(); + if (json.isObject() && json.isMember("peer-ips")) + { + Json::Value const& ips = json["peer-ips"]; + if (ips.isArray()) + { + std::vector eps; + eps.reserve(ips.size()); + for (auto const& v : ips) + { + if (v.isString()) + { + error_code ec; + auto const ep = parse_endpoint(v.asString(), ec); + if (!ec) + eps.push_back(ep); + } + } + overlay_.peerFinder().onRedirects(remote_endpoint_, eps); + } + } } + } - Json::Value const& peerIps = json["peer-ips"]; - if (!peerIps.isArray()) - return fail("processResponse: invalid peer-ips format"); - - // Extract and validate peer endpoints - std::vector redirectEndpoints; - redirectEndpoints.reserve(peerIps.size()); - - for (auto const& ipValue : peerIps) - { - if (!ipValue.isString()) - continue; - - error_code ec; - auto const endpoint = parse_endpoint(ipValue.asString(), ec); - if (!ec) - redirectEndpoints.push_back(endpoint); - } - - // Notify PeerFinder about the redirect redirectEndpoints may be empty - overlay_.peerFinder().onRedirects(remote_endpoint_, redirectEndpoints); - - return fail("processResponse: failed to connect to peer: redirected"); + if (!OverlayImpl::isPeerUpgrade(response_)) + { + JLOG(journal_.info()) + << "Unable to upgrade to peer protocol: " << response_.result() + << " (" << response_.reason() << ")"; + return close(); } // Just because our peer selected a particular protocol version doesn't @@ -577,11 +381,11 @@ ConnectAttempt::processResponse() auto const sharedValue = makeSharedValue(*stream_ptr_, journal_); if (!sharedValue) - return shutdown(); // makeSharedValue logs + return close(); // makeSharedValue logs try { - auto const publicKey = verifyHandshake( + auto publicKey = verifyHandshake( response_, *sharedValue, overlay_.setup().networkID, @@ -589,32 +393,22 @@ ConnectAttempt::processResponse() remote_endpoint_.address(), app_); - JLOG(journal_.debug()) - << "Protocol: " << to_string(*negotiatedProtocol); JLOG(journal_.info()) << "Public Key: " << toBase58(TokenType::NodePublic, publicKey); + JLOG(journal_.debug()) + << "Protocol: " << to_string(*negotiatedProtocol); + auto const member = app_.cluster().member(publicKey); if (member) { JLOG(journal_.info()) << "Cluster name: " << *member; } - auto const result = - overlay_.peerFinder().activate(slot_, publicKey, !member->empty()); + auto const result = overlay_.peerFinder().activate( + slot_, publicKey, static_cast(member)); if (result != PeerFinder::Result::success) - { - std::stringstream ss; - ss << "Outbound Connect Attempt " << remote_endpoint_ << " " - << to_string(result); - return fail(ss.str()); - } - - if (!socket_.is_open()) - return; - - if (shutdown_) - return tryAsyncShutdown(); + return fail("Outbound " + std::string(to_string(result))); auto const peer = std::make_shared( app_, diff --git a/src/xrpld/overlay/detail/ConnectAttempt.h b/src/xrpld/overlay/detail/ConnectAttempt.h index 38b9482d9d..febbe88f45 100644 --- a/src/xrpld/overlay/detail/ConnectAttempt.h +++ b/src/xrpld/overlay/detail/ConnectAttempt.h @@ -22,258 +22,90 @@ #include -#include - namespace ripple { -/** - * @class ConnectAttempt - * @brief Manages outbound peer connection attempts with comprehensive timeout - * handling - * - * The ConnectAttempt class handles the complete lifecycle of establishing an - * outbound connection to a peer in the XRPL network. It implements a - * sophisticated dual-timer system that provides both global timeout protection - * and per-step timeout diagnostics. - * - * The connection establishment follows these steps: - * 1. **TCP Connect**: Establish basic network connection - * 2. **TLS Handshake**: Negotiate SSL/TLS encryption - * 3. **HTTP Write**: Send peer handshake request - * 4. **HTTP Read**: Receive and validate peer response - * 5. **Complete**: Connection successfully established - * - * Uses a hybrid timeout approach: - * - **Global Timer**: Hard limit (20s) for entire connection process - * - **Step Timers**: Individual timeouts for each connection phase - * - * - All errors result in connection termination - * - * All operations are serialized using boost::asio::strand to ensure thread - * safety. The class is designed to be used exclusively within the ASIO event - * loop. - * - * @note This class should not be used directly. It is managed by OverlayImpl - * as part of the peer discovery and connection management system. - * - */ +/** Manages an outbound connection attempt. */ class ConnectAttempt : public OverlayImpl::Child, public std::enable_shared_from_this { private: using error_code = boost::system::error_code; + using endpoint_type = boost::asio::ip::tcp::endpoint; + using request_type = boost::beast::http::request; + using response_type = boost::beast::http::response; + using socket_type = boost::asio::ip::tcp::socket; using middle_type = boost::beast::tcp_stream; using stream_type = boost::beast::ssl_stream; using shared_context = std::shared_ptr; - /** - * @enum ConnectionStep - * @brief Represents the current phase of the connection establishment - * process - * - * Used for tracking progress and providing detailed timeout diagnostics. - * Each step has its own timeout value defined in StepTimeouts. - */ - enum class ConnectionStep { - Init, // Initial state, nothing started - TcpConnect, // Establishing TCP connection to remote peer - TlsHandshake, // Performing SSL/TLS handshake - HttpWrite, // Sending HTTP upgrade request - HttpRead, // Reading HTTP upgrade response - Complete, // Connection successfully established - ShutdownStarted // Connection shutdown has started - }; - - // A timeout for connection process, greater than all step timeouts - static constexpr std::chrono::seconds connectTimeout{25}; - - /** - * @struct StepTimeouts - * @brief Defines timeout values for each connection step - * - * These timeouts are designed to detect slow individual phases while - * allowing the global timeout to enforce the overall time limit. - */ - struct StepTimeouts - { - // TCP connection timeout - static constexpr std::chrono::seconds tcpConnect{8}; - // SSL handshake timeout - static constexpr std::chrono::seconds tlsHandshake{8}; - // HTTP write timeout - static constexpr std::chrono::seconds httpWrite{3}; - // HTTP read timeout - static constexpr std::chrono::seconds httpRead{3}; - // SSL shutdown timeout - static constexpr std::chrono::seconds tlsShutdown{2}; - }; - - // Core application and networking components Application& app_; - Peer::id_t const id_; + std::uint32_t const id_; beast::WrappedSink sink_; beast::Journal const journal_; endpoint_type remote_endpoint_; Resource::Consumer usage_; - boost::asio::strand strand_; boost::asio::basic_waitable_timer timer_; - boost::asio::basic_waitable_timer stepTimer_; - - std::unique_ptr stream_ptr_; // SSL stream (owned) + std::unique_ptr stream_ptr_; socket_type& socket_; stream_type& stream_; boost::beast::multi_buffer read_buf_; - response_type response_; std::shared_ptr slot_; request_type req_; - bool shutdown_ = false; // Shutdown has been initiated - bool ioPending_ = false; // Async I/O operation in progress - ConnectionStep currentStep_ = ConnectionStep::Init; - public: - /** - * @brief Construct a new ConnectAttempt object - * - * @param app Application context providing configuration and services - * @param io_context ASIO I/O context for async operations - * @param remote_endpoint Target peer endpoint to connect to - * @param usage Resource usage tracker for rate limiting - * @param context Shared SSL context for encryption - * @param id Unique peer identifier for this connection attempt - * @param slot PeerFinder slot representing this connection - * @param journal Logging interface for diagnostics - * @param overlay Parent overlay manager - * - * @note The constructor only initializes the object. Call run() to begin - * the actual connection attempt. - */ ConnectAttempt( Application& app, boost::asio::io_context& io_context, endpoint_type const& remote_endpoint, Resource::Consumer usage, shared_context const& context, - Peer::id_t id, + std::uint32_t id, std::shared_ptr const& slot, beast::Journal journal, OverlayImpl& overlay); ~ConnectAttempt(); - /** - * @brief Stop the connection attempt - * - * This method is thread-safe and can be called from any thread. - */ void stop() override; - /** - * @brief Begin the connection attempt - * - * This method is thread-safe and posts to the strand if needed. - */ void run(); private: - /** - * @brief Set timers for the specified connection step - * - * @param step The connection step to set timers for - * - * Sets both the step-specific timer and the global timer (if not already - * set). - */ void - setTimer(ConnectionStep step); - - /** - * @brief Cancel both global and step timers - * - * Used during cleanup and when connection completes successfully. - * Exceptions from timer cancellation are safely ignored. - */ + close(); + void + fail(std::string const& reason); + void + fail(std::string const& name, error_code ec); + void + setTimer(); void cancelTimer(); - - /** - * @brief Handle timer expiration events - * - * @param ec Error code from timer operation - * - * Determines which timer expired (global vs step) and logs appropriate - * diagnostic information before terminating the connection. - */ void onTimer(error_code ec); - - // Connection phase handlers void - onConnect(error_code ec); // TCP connection completion handler + onConnect(error_code ec); void - onHandshake(error_code ec); // TLS handshake completion handler + onHandshake(error_code ec); void - onWrite(error_code ec); // HTTP write completion handler + onWrite(error_code ec); void - onRead(error_code ec); // HTTP read completion handler - - // Error and cleanup handlers + onRead(error_code ec); void - fail(std::string const& reason); // Fail with custom reason - void - fail(std::string const& name, error_code ec); // Fail with system error - void - shutdown(); // Initiate graceful shutdown - void - tryAsyncShutdown(); // Attempt async SSL shutdown - void - onShutdown(error_code ec); // SSL shutdown completion handler - void - close(); // Force close socket - - /** - * @brief Process the HTTP upgrade response from peer - * - * Validates the peer's response, extracts protocol information, - * verifies handshake, and either creates a PeerImp or handles - * redirect responses. - */ + onShutdown(error_code ec); void processResponse(); - static std::string - stepToString(ConnectionStep step) - { - switch (step) - { - case ConnectionStep::Init: - return "Init"; - case ConnectionStep::TcpConnect: - return "TcpConnect"; - case ConnectionStep::TlsHandshake: - return "TlsHandshake"; - case ConnectionStep::HttpWrite: - return "HttpWrite"; - case ConnectionStep::HttpRead: - return "HttpRead"; - case ConnectionStep::Complete: - return "Complete"; - case ConnectionStep::ShutdownStarted: - return "ShutdownStarted"; - } - return "Unknown"; - }; - template static boost::asio::ip::tcp::endpoint parse_endpoint(std::string const& s, boost::system::error_code& ec) diff --git a/src/xrpld/overlay/detail/PeerImp.cpp b/src/xrpld/overlay/detail/PeerImp.cpp index 93371f42ab..2cd9432eb8 100644 --- a/src/xrpld/overlay/detail/PeerImp.cpp +++ b/src/xrpld/overlay/detail/PeerImp.cpp @@ -44,7 +44,6 @@ #include #include -#include #include #include #include @@ -60,10 +59,6 @@ std::chrono::milliseconds constexpr peerHighLatency{300}; /** How often we PING the peer to check for latency and sendq probe */ std::chrono::seconds constexpr peerTimerInterval{60}; - -/** The timeout for a shutdown timer */ -std::chrono::seconds constexpr shutdownTimerInterval{5}; - } // namespace // TODO: Remove this exclusion once unit tests are added after the hotfix @@ -220,17 +215,23 @@ PeerImp::stop() { if (!strand_.running_in_this_thread()) return post(strand_, std::bind(&PeerImp::stop, shared_from_this())); - - if (!socket_.is_open()) - return; - - // The rationale for using different severity levels is that - // outbound connections are under our control and may be logged - // at a higher level, but inbound connections are more numerous and - // uncontrolled so to prevent log flooding the severity is reduced. - JLOG(journal_.debug()) << "stop: Stop"; - - shutdown(); + if (socket_.is_open()) + { + // The rationale for using different severity levels is that + // outbound connections are under our control and may be logged + // at a higher level, but inbound connections are more numerous and + // uncontrolled so to prevent log flooding the severity is reduced. + // + if (inbound_) + { + JLOG(journal_.debug()) << "Stop"; + } + else + { + JLOG(journal_.info()) << "Stop"; + } + } + close(); } //------------------------------------------------------------------------------ @@ -240,13 +241,10 @@ PeerImp::send(std::shared_ptr const& m) { if (!strand_.running_in_this_thread()) return post(strand_, std::bind(&PeerImp::send, shared_from_this(), m)); - - if (!socket_.is_open()) + if (gracefulClose_) + return; + if (detaching_) return; - - // we are in progress of closing the connection - if (shutdown_) - return tryAsyncShutdown(); auto validator = m->getValidatorKey(); if (validator && !squelch_.expireSquelch(*validator)) @@ -289,7 +287,6 @@ PeerImp::send(std::shared_ptr const& m) if (sendq_size != 0) return; - writePending_ = true; boost::asio::async_write( stream_, boost::asio::buffer( @@ -576,21 +573,34 @@ PeerImp::hasRange(std::uint32_t uMin, std::uint32_t uMax) //------------------------------------------------------------------------------ void -PeerImp::fail(std::string const& name, error_code ec) +PeerImp::close() { XRPL_ASSERT( strand_.running_in_this_thread(), - "ripple::PeerImp::fail : strand in this thread"); + "ripple::PeerImp::close : strand in this thread"); + if (socket_.is_open()) + { + detaching_ = true; // DEPRECATED + try + { + timer_.cancel(); + socket_.close(); + } + catch (boost::system::system_error const&) + { + // ignored + } - if (!socket_.is_open()) - return; - - JLOG(journal_.warn()) << name << " from " - << toBase58(TokenType::NodePublic, publicKey_) - << " at " << remote_address_.to_string() << ": " - << ec.message(); - - shutdown(); + overlay_.incPeerDisconnect(); + if (inbound_) + { + JLOG(journal_.debug()) << "Closed"; + } + else + { + JLOG(journal_.info()) << "Closed"; + } + } } void @@ -603,39 +613,45 @@ PeerImp::fail(std::string const& reason) (void(Peer::*)(std::string const&)) & PeerImp::fail, shared_from_this(), reason)); - - if (!socket_.is_open()) - return; - - // Call to name() locks, log only if the message will be outputed - if (journal_.active(beast::severities::kWarning)) + if (journal_.active(beast::severities::kWarning) && socket_.is_open()) { std::string const n = name(); JLOG(journal_.warn()) << (n.empty() ? remote_address_.to_string() : n) << " failed: " << reason; } - - shutdown(); + close(); } void -PeerImp::tryAsyncShutdown() +PeerImp::fail(std::string const& name, error_code ec) { XRPL_ASSERT( strand_.running_in_this_thread(), - "ripple::PeerImp::tryAsyncShutdown : strand in this thread"); + "ripple::PeerImp::fail : strand in this thread"); + if (socket_.is_open()) + { + JLOG(journal_.warn()) + << name << " from " << toBase58(TokenType::NodePublic, publicKey_) + << " at " << remote_address_.to_string() << ": " << ec.message(); + } + close(); +} - if (!shutdown_ || shutdownStarted_) +void +PeerImp::gracefulClose() +{ + XRPL_ASSERT( + strand_.running_in_this_thread(), + "ripple::PeerImp::gracefulClose : strand in this thread"); + XRPL_ASSERT( + socket_.is_open(), "ripple::PeerImp::gracefulClose : socket is open"); + XRPL_ASSERT( + !gracefulClose_, + "ripple::PeerImp::gracefulClose : socket is not closing"); + gracefulClose_ = true; + if (send_queue_.size() > 0) return; - - if (readPending_ || writePending_) - return; - - shutdownStarted_ = true; - - setTimer(shutdownTimerInterval); - - // gracefully shutdown the SSL socket, performing a shutdown handshake + setTimer(); stream_.async_shutdown(bind_executor( strand_, std::bind( @@ -643,124 +659,68 @@ PeerImp::tryAsyncShutdown() } void -PeerImp::shutdown() -{ - XRPL_ASSERT( - strand_.running_in_this_thread(), - "ripple::PeerImp::shutdown: strand in this thread"); - - if (!socket_.is_open() || shutdown_) - return; - - shutdown_ = true; - - boost::beast::get_lowest_layer(stream_).cancel(); - - tryAsyncShutdown(); -} - -void -PeerImp::onShutdown(error_code ec) -{ - cancelTimer(); - if (ec) - { - // - eof: the stream was cleanly closed - // - operation_aborted: an expired timer (slow shutdown) - // - stream_truncated: the tcp connection closed (no handshake) it could - // occur if a peer does not perform a graceful disconnect - // - broken_pipe: the peer is gone - bool shouldLog = - (ec != boost::asio::error::eof && - ec != boost::asio::error::operation_aborted && - ec.message().find("application data after close notify") == - std::string::npos); - - if (shouldLog) - { - JLOG(journal_.debug()) << "onShutdown: " << ec.message(); - } - } - - close(); -} - -void -PeerImp::close() -{ - XRPL_ASSERT( - strand_.running_in_this_thread(), - "ripple::PeerImp::close : strand in this thread"); - - if (!socket_.is_open()) - return; - - cancelTimer(); - - error_code ec; - socket_.close(ec); - - overlay_.incPeerDisconnect(); - - // The rationale for using different severity levels is that - // outbound connections are under our control and may be logged - // at a higher level, but inbound connections are more numerous and - // uncontrolled so to prevent log flooding the severity is reduced. - JLOG((inbound_ ? journal_.debug() : journal_.info())) << "close: Closed"; -} - -//------------------------------------------------------------------------------ - -void -PeerImp::setTimer(std::chrono::seconds interval) +PeerImp::setTimer() { try { - timer_.expires_after(interval); + timer_.expires_after(peerTimerInterval); } - catch (std::exception const& ex) + catch (boost::system::system_error const& e) { - JLOG(journal_.error()) << "setTimer: " << ex.what(); - return shutdown(); + JLOG(journal_.error()) << "setTimer: " << e.code(); + return; } - timer_.async_wait(bind_executor( strand_, std::bind( &PeerImp::onTimer, shared_from_this(), std::placeholders::_1))); } +// convenience for ignoring the error code +void +PeerImp::cancelTimer() +{ + try + { + timer_.cancel(); + } + catch (boost::system::system_error const&) + { + // ignored + } +} + +//------------------------------------------------------------------------------ + +std::string +PeerImp::makePrefix(id_t id) +{ + std::stringstream ss; + ss << "[" << std::setfill('0') << std::setw(3) << id << "] "; + return ss.str(); +} + void PeerImp::onTimer(error_code const& ec) { - XRPL_ASSERT( - strand_.running_in_this_thread(), - "ripple::PeerImp::onTimer : strand in this thread"); - if (!socket_.is_open()) return; + if (ec == boost::asio::error::operation_aborted) + return; + if (ec) { - // do not initiate shutdown, timers are frequently cancelled - if (ec == boost::asio::error::operation_aborted) - return; - // This should never happen JLOG(journal_.error()) << "onTimer: " << ec.message(); return close(); } - // the timer expired before the shutdown completed - // force close the connection - if (shutdown_) - { - JLOG(journal_.debug()) << "onTimer: shutdown timer expired"; - return close(); - } - if (large_sendq_++ >= Tuning::sendqIntervals) - return fail("Large send queue"); + { + fail("Large send queue"); + return; + } if (auto const t = tracking_.load(); !inbound_ && t != Tracking::converged) { @@ -777,13 +737,17 @@ PeerImp::onTimer(error_code const& ec) (duration > app_.config().MAX_UNKNOWN_TIME))) { overlay_.peerFinder().on_failure(slot_); - return fail("Not useful"); + fail("Not useful"); + return; } } // Already waiting for PONG if (lastPingSeq_) - return fail("Ping Timeout"); + { + fail("Ping Timeout"); + return; + } lastPingTime_ = clock_type::now(); lastPingSeq_ = rand_int(); @@ -794,28 +758,22 @@ PeerImp::onTimer(error_code const& ec) send(std::make_shared(message, protocol::mtPING)); - setTimer(peerTimerInterval); + setTimer(); } void -PeerImp::cancelTimer() noexcept +PeerImp::onShutdown(error_code ec) { - try + cancelTimer(); + // If we don't get eof then something went wrong + if (!ec) { - timer_.cancel(); + JLOG(journal_.error()) << "onShutdown: expected error condition"; + return close(); } - catch (std::exception const& ex) - { - JLOG(journal_.error()) << "cancelTimer: " << ex.what(); - } -} - -std::string -PeerImp::makePrefix(id_t id) -{ - std::stringstream ss; - ss << "[" << std::setfill('0') << std::setw(3) << id << "] "; - return ss.str(); + if (ec != boost::asio::error::eof) + return fail("onShutdown", ec); + close(); } //------------------------------------------------------------------------------ @@ -828,10 +786,6 @@ PeerImp::doAccept() JLOG(journal_.debug()) << "doAccept: " << remote_address_; - // a shutdown was initiated before the handshake, there is nothing to do - if (shutdown_) - return tryAsyncShutdown(); - auto const sharedValue = makeSharedValue(*stream_ptr_, journal_); // This shouldn't fail since we already computed @@ -839,7 +793,7 @@ PeerImp::doAccept() if (!sharedValue) return fail("makeSharedValue: Unexpected failure"); - JLOG(journal_.debug()) << "Protocol: " << to_string(protocol_); + JLOG(journal_.info()) << "Protocol: " << to_string(protocol_); JLOG(journal_.info()) << "Public Key: " << toBase58(TokenType::NodePublic, publicKey_); @@ -882,7 +836,7 @@ PeerImp::doAccept() if (!socket_.is_open()) return; if (ec == boost::asio::error::operation_aborted) - return tryAsyncShutdown(); + return; if (ec) return fail("onWriteResponse", ec); if (write_buffer->size() == bytes_transferred) @@ -911,10 +865,6 @@ PeerImp::domain() const void PeerImp::doProtocolStart() { - // a shutdown was initiated before the handshare, there is nothing to do - if (shutdown_) - return tryAsyncShutdown(); - onReadMessage(error_code(), 0); // Send all the validator lists that have been loaded @@ -946,45 +896,30 @@ PeerImp::doProtocolStart() if (auto m = overlay_.getManifestsMessage()) send(m); - setTimer(peerTimerInterval); + setTimer(); } // Called repeatedly with protocol message data void PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred) { - XRPL_ASSERT( - strand_.running_in_this_thread(), - "ripple::PeerImp::onReadMessage : strand in this thread"); - - readPending_ = false; - if (!socket_.is_open()) return; - - if (ec) + if (ec == boost::asio::error::operation_aborted) + return; + if (ec == boost::asio::error::eof) { - if (ec == boost::asio::error::eof) - { - JLOG(journal_.debug()) << "EOF"; - return shutdown(); - } - - if (ec == boost::asio::error::operation_aborted) - return tryAsyncShutdown(); - - return fail("onReadMessage", ec); + JLOG(journal_.info()) << "EOF"; + return gracefulClose(); } - // we started shutdown, no reason to process further data - if (shutdown_) - return tryAsyncShutdown(); - + if (ec) + return fail("onReadMessage", ec); if (auto stream = journal_.trace()) { - stream << "onReadMessage: " - << (bytes_transferred > 0 - ? to_string(bytes_transferred) + " bytes" - : ""); + if (bytes_transferred > 0) + stream << "onReadMessage: " << bytes_transferred << " bytes"; + else + stream << "onReadMessage"; } metrics_.recv.add_message(bytes_transferred); @@ -1006,29 +941,17 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred) 350ms, journal_); - if (!socket_.is_open()) - return; - - // the error_code is produced by invokeProtocolMessage - // it could be due to a bad message if (ec) return fail("onReadMessage", ec); - + if (!socket_.is_open()) + return; + if (gracefulClose_) + return; if (bytes_consumed == 0) break; - read_buffer_.consume(bytes_consumed); } - // check if a shutdown was initiated while processing messages - if (shutdown_) - return tryAsyncShutdown(); - - readPending_ = true; - - XRPL_ASSERT( - !shutdownStarted_, "ripple::PeerImp::onReadMessage : shutdown started"); - // Timeout on writes only stream_.async_read_some( read_buffer_.prepare(std::max(Tuning::readBufferBytes, hint)), @@ -1044,29 +967,18 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred) void PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred) { - XRPL_ASSERT( - strand_.running_in_this_thread(), - "ripple::PeerImp::onWriteMessage : strand in this thread"); - - writePending_ = false; - if (!socket_.is_open()) return; - + if (ec == boost::asio::error::operation_aborted) + return; if (ec) - { - if (ec == boost::asio::error::operation_aborted) - return tryAsyncShutdown(); - return fail("onWriteMessage", ec); - } - if (auto stream = journal_.trace()) { - stream << "onWriteMessage: " - << (bytes_transferred > 0 - ? to_string(bytes_transferred) + " bytes" - : ""); + if (bytes_transferred > 0) + stream << "onWriteMessage: " << bytes_transferred << " bytes"; + else + stream << "onWriteMessage"; } metrics_.sent.add_message(bytes_transferred); @@ -1075,17 +987,8 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred) !send_queue_.empty(), "ripple::PeerImp::onWriteMessage : non-empty send buffer"); send_queue_.pop(); - - if (shutdown_) - return tryAsyncShutdown(); - if (!send_queue_.empty()) { - writePending_ = true; - XRPL_ASSERT( - !shutdownStarted_, - "ripple::PeerImp::onWriteMessage : shutdown started"); - // Timeout on writes only return boost::asio::async_write( stream_, @@ -1099,6 +1002,16 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred) std::placeholders::_1, std::placeholders::_2))); } + + if (gracefulClose_) + { + return stream_.async_shutdown(bind_executor( + strand_, + std::bind( + &PeerImp::onShutdown, + shared_from_this(), + std::placeholders::_1))); + } } //------------------------------------------------------------------------------ diff --git a/src/xrpld/overlay/detail/PeerImp.h b/src/xrpld/overlay/detail/PeerImp.h index c2221c136d..3d9a0c0b1e 100644 --- a/src/xrpld/overlay/detail/PeerImp.h +++ b/src/xrpld/overlay/detail/PeerImp.h @@ -40,7 +40,6 @@ #include #include -#include #include #include #include @@ -50,68 +49,6 @@ namespace ripple { struct ValidatorBlobInfo; class SHAMap; -/** - * @class PeerImp - * @brief This class manages established peer-to-peer connections, handles - message exchange, monitors connection health, and graceful shutdown. - * - - * The PeerImp shutdown mechanism is a multi-stage process - * designed to ensure graceful connection termination while handling ongoing - * I/O operations safely. The shutdown can be initiated from multiple points - * and follows a deterministic state machine. - * - * The shutdown process can be triggered from several entry points: - * - **External requests**: `stop()` method called by overlay management - * - **Error conditions**: `fail(error_code)` or `fail(string)` on protocol - * violations - * - **Timer expiration**: Various timeout scenarios (ping timeout, large send - * queue) - * - **Connection health**: Peer tracking divergence or unknown state timeouts - * - * The shutdown follows this progression: - * - * Normal Operation → shutdown() → tryAsyncShutdown() → onShutdown() → close() - * ↓ ↓ ↓ ↓ - * Set shutdown_ SSL graceful Timer cancel Socket close - * Cancel timer shutdown start & cleanup & metrics - * 5s safety timer Set shutdownStarted_ update - * - * Two primary flags coordinate the shutdown process: - * - `shutdown_`: Set when shutdown is requested - * - `shutdownStarted_`: Set when SSL shutdown begins - * - * The shutdown mechanism carefully coordinates with ongoing read/write - * operations: - * - * **Read Operations (`onReadMessage`)**: - * - Checks `shutdown_` flag after processing each message batch - * - If shutdown initiated during processing, calls `tryAsyncShutdown()` - * - * **Write Operations (`onWriteMessage`)**: - * - Checks `shutdown_` flag before queuing new writes - * - Calls `tryAsyncShutdown()` when shutdown flag detected - * - * Multiple timers require coordination during shutdown: - * 1. **Peer Timer**: Regular ping/pong timer cancelled immediately in - * `shutdown()` - * 2. **Shutdown Timer**: 5-second safety timer ensures shutdown completion - * 3. **Operation Cancellation**: All pending async operations are cancelled - * - * The shutdown implements fallback mechanisms: - * - **Graceful Path**: SSL shutdown → Socket close → Cleanup - * - **Forced Path**: If SSL shutdown fails or times out, proceeds to socket - * close - * - **Safety Timer**: 5-second timeout prevents hanging shutdowns - * - * All shutdown operations are serialized through the boost::asio::strand to - * ensure thread safety. The strand guarantees that shutdown state changes - * and I/O operation callbacks are executed sequentially. - * - * @note This class requires careful coordination between async operations, - * timer management, and shutdown procedures to ensure no resource leaks - * or hanging connections in high-throughput networking scenarios. - */ class PeerImp : public Peer, public std::enable_shared_from_this, public OverlayImpl::Child @@ -142,8 +79,6 @@ private: socket_type& socket_; stream_type& stream_; boost::asio::strand strand_; - - // Multi-purpose timer for peer activity monitoring and shutdown safety waitable_timer timer_; // Updated at each stage of the connection process to reflect @@ -160,6 +95,7 @@ private: std::atomic tracking_; clock_type::time_point trackingTime_; + bool detaching_ = false; // Node public key of peer. PublicKey const publicKey_; std::string name_; @@ -239,19 +175,7 @@ private: http_response_type response_; boost::beast::http::fields const& headers_; std::queue> send_queue_; - - // Primary shutdown flag set when shutdown is requested - bool shutdown_ = false; - - // SSL shutdown coordination flag - bool shutdownStarted_ = false; - - // Indicates a read operation is currently pending - bool readPending_ = false; - - // Indicates a write operation is currently pending - bool writePending_ = false; - + bool gracefulClose_ = false; int large_sendq_ = 0; std::unique_ptr load_event_; // The highest sequence of each PublisherList that has @@ -501,6 +425,9 @@ public: bool isHighLatency() const override; + void + fail(std::string const& reason); + bool compressionEnabled() const override { @@ -514,129 +441,32 @@ public: } private: - /** - * @brief Handles a failure associated with a specific error code. - * - * This function is called when an operation fails with an error code. It - * logs the warning message and gracefully shutdowns the connection. - * - * The function will do nothing if the connection is already closed or if a - * shutdown is already in progress. - * - * @param name The name of the operation that failed (e.g., "read", - * "write"). - * @param ec The error code associated with the failure. - * @note This function must be called from within the object's strand. - */ - void - fail(std::string const& name, error_code ec); - - /** - * @brief Handles a failure described by a reason string. - * - * This overload is used for logical errors or protocol violations not - * associated with a specific error code. It logs a warning with the - * given reason, then initiates a graceful shutdown. - * - * The function will do nothing if the connection is already closed or if a - * shutdown is already in progress. - * - * @param reason A descriptive string explaining the reason for the failure. - * @note This function must be called from within the object's strand. - */ - void - fail(std::string const& reason); - - /** @brief Initiates the peer disconnection sequence. - * - * This is the primary entry point to start closing a peer connection. It - * marks the peer for shutdown and cancels any outstanding asynchronous - * operations. This cancellation allows the graceful shutdown to proceed - * once the handlers for the cancelled operations have completed. - * - * @note This method must be called on the peer's strand. - */ - void - shutdown(); - - /** @brief Attempts to perform a graceful SSL shutdown if conditions are - * met. - * - * This helper function checks if the peer is in a state where a graceful - * SSL shutdown can be performed (i.e., shutdown has been requested and no - * I/O operations are currently in progress). - * - * @note This method must be called on the peer's strand. - */ - void - tryAsyncShutdown(); - - /** - * @brief Handles the completion of the asynchronous SSL shutdown. - * - * This function is the callback for the `async_shutdown` operation started - * in `shutdown()`. Its first action is to cancel the timer. It - * then inspects the error code to determine the outcome. - * - * Regardless of the result, this function proceeds to call `close()` to - * ensure the underlying socket is fully closed. - * - * @param ec The error code resulting from the `async_shutdown` operation. - */ - void - onShutdown(error_code ec); - - /** - * @brief Forcibly closes the underlying socket connection. - * - * This function provides the final, non-graceful shutdown of the peer - * connection. It ensures any pending timers are cancelled and then - * immediately closes the TCP socket, bypassing the SSL shutdown handshake. - * - * After closing, it notifies the overlay manager of the disconnection. - * - * @note This function must be called from within the object's strand. - */ void close(); - /** - * @brief Sets and starts the peer timer. - * - * This function starts timer, which is used to detect inactivity - * and prevent stalled connections. It sets the timer to expire after the - * predefined `peerTimerInterval`. - * - * @note This function will terminate the connection in case of any errors. - */ void - setTimer(std::chrono::seconds interval); + fail(std::string const& name, error_code ec); - /** - * @brief Handles the expiration of the peer activity timer. - * - * This callback is invoked when the timer set by `setTimer` expires. It - * watches the peer connection, checking for various timeout and health - * conditions. - * - * @param ec The error code associated with the timer's expiration. - * `operation_aborted` is expected if the timer was cancelled. - */ void - onTimer(error_code const& ec); + gracefulClose(); - /** - * @brief Cancels any pending wait on the peer activity timer. - * - * This function is called to stop the timer. It gracefully manages any - * errors that might occur during the cancellation process. - */ void - cancelTimer() noexcept; + setTimer(); + + void + cancelTimer(); static std::string makePrefix(id_t id); + // Called when the timer wait completes + void + onTimer(boost::system::error_code const& ec); + + // Called when SSL shutdown completes + void + onShutdown(error_code ec); + void doAccept();