From 90c74454c875a079d338af7696dd2be5e59aa0f9 Mon Sep 17 00:00:00 2001 From: Vito <5780819+Tapanito@users.noreply.github.com> Date: Wed, 10 Sep 2025 13:48:25 +0200 Subject: [PATCH] introduces hybrid connection timeout to outbound connection attempts --- src/xrpld/overlay/detail/ConnectAttempt.cpp | 98 +++++++-- src/xrpld/overlay/detail/ConnectAttempt.h | 217 +++++++++++++++++--- src/xrpld/overlay/detail/PeerImp.cpp | 4 + 3 files changed, 273 insertions(+), 46 deletions(-) diff --git a/src/xrpld/overlay/detail/ConnectAttempt.cpp b/src/xrpld/overlay/detail/ConnectAttempt.cpp index 2555221f1f..3bb3083680 100644 --- a/src/xrpld/overlay/detail/ConnectAttempt.cpp +++ b/src/xrpld/overlay/detail/ConnectAttempt.cpp @@ -47,6 +47,7 @@ ConnectAttempt::ConnectAttempt( , usage_(usage) , strand_(boost::asio::make_strand(io_context)) , timer_(io_context) + , stepTimer_(io_context) , stream_ptr_(std::make_unique( socket_type(std::forward(io_context)), *context)) @@ -91,7 +92,7 @@ ConnectAttempt::run() ioPending_ = true; // Allow up to connectTimeout_ seconds to establish remote peer connection - setTimer(); + setTimer(ConnectionStep::TcpConnect); stream_.next_layer().async_connect( remote_endpoint_, @@ -206,24 +207,70 @@ ConnectAttempt::fail(std::string const& name, error_code ec) } void -ConnectAttempt::setTimer() +ConnectAttempt::setTimer(ConnectionStep step) { + currentStep_ = step; + + // Set global timer (only if not already set) + if (timer_.expiry() == std::chrono::steady_clock::time_point{}) + { + try + { + timer_.expires_after(connectTimeout); + timer_.async_wait(boost::asio::bind_executor( + strand_, + std::bind( + &ConnectAttempt::onTimer, + shared_from_this(), + std::placeholders::_1))); + } + catch (std::exception const& ex) + { + JLOG(journal_.error()) << "setTimer (global): " << ex.what(); + return close(); + } + } + + // Set step-specific timer try { - timer_.expires_after(connectTimeout); + std::chrono::seconds stepTimeout; + switch (step) + { + case ConnectionStep::TcpConnect: + stepTimeout = StepTimeouts::tcpConnect; + break; + case ConnectionStep::TlsHandshake: + stepTimeout = StepTimeouts::tlsHandshake; + break; + case ConnectionStep::HttpWrite: + stepTimeout = StepTimeouts::httpWrite; + break; + case ConnectionStep::HttpRead: + stepTimeout = StepTimeouts::httpRead; + break; + case ConnectionStep::Complete: + return; // No timer needed for complete step + } + + // call to expires_after cancels previous timer + stepTimer_.expires_after(stepTimeout); + stepTimer_.async_wait(boost::asio::bind_executor( + strand_, + std::bind( + &ConnectAttempt::onTimer, + shared_from_this(), + std::placeholders::_1))); + + JLOG(journal_.trace()) << "setTimer: " << stepToString(step) + << " timeout=" << stepTimeout.count() << "s"; } catch (std::exception const& ex) { - JLOG(journal_.error()) << "setTimer: " << ex.what(); + JLOG(journal_.error()) + << "setTimer (step " << stepToString(step) << "): " << ex.what(); return close(); } - - timer_.async_wait(boost::asio::bind_executor( - strand_, - std::bind( - &ConnectAttempt::onTimer, - shared_from_this(), - std::placeholders::_1))); } void @@ -232,6 +279,7 @@ ConnectAttempt::cancelTimer() try { timer_.cancel(); + stepTimer_.cancel(); } catch (boost::system::system_error const&) { @@ -256,7 +304,26 @@ ConnectAttempt::onTimer(error_code ec) return close(); } - JLOG(journal_.debug()) << "onTimer: Timeout"; + // Determine which timer expired by checking their expiry times + auto const now = std::chrono::steady_clock::now(); + bool globalExpired = (timer_.expiry() <= now); + bool stepExpired = (stepTimer_.expiry() <= now); + + if (globalExpired) + { + JLOG(journal_.debug()) + << "onTimer: Global timeout; step: " << stepToString(currentStep_); + } + else if (stepExpired) + { + JLOG(journal_.debug()) + << "onTimer: Step timeout; step: " << stepToString(currentStep_); + } + else + { + JLOG(journal_.warn()) << "onTimer: Unexpected timer callback"; + } + close(); } @@ -286,6 +353,8 @@ ConnectAttempt::onConnect(error_code ec) ioPending_ = true; + setTimer(ConnectionStep::TlsHandshake); + stream_.set_verify_mode(boost::asio::ssl::verify_none); stream_.async_handshake( boost::asio::ssl::stream_base::client, @@ -345,6 +414,8 @@ ConnectAttempt::onHandshake(error_code ec) ioPending_ = true; + setTimer(ConnectionStep::HttpWrite); + boost::beast::http::async_write( stream_, req_, @@ -374,6 +445,8 @@ ConnectAttempt::onWrite(error_code ec) ioPending_ = true; + setTimer(ConnectionStep::HttpRead); + boost::beast::http::async_read( stream_, read_buf_, @@ -391,6 +464,7 @@ ConnectAttempt::onRead(error_code ec) { cancelTimer(); ioPending_ = false; + currentStep_ = ConnectionStep::Complete; if (ec) { diff --git a/src/xrpld/overlay/detail/ConnectAttempt.h b/src/xrpld/overlay/detail/ConnectAttempt.h index 9884a5b725..aa79cb9a07 100644 --- a/src/xrpld/overlay/detail/ConnectAttempt.h +++ b/src/xrpld/overlay/detail/ConnectAttempt.h @@ -26,99 +26,248 @@ namespace ripple { -/** Manages an outbound connection attempt. */ +/** + * @class ConnectAttempt + * @brief Manages outbound peer connection attempts with comprehensive timeout + * handling + * + * The ConnectAttempt class handles the complete lifecycle of establishing an + * outbound connection to a peer in the XRPL network. It implements a + * sophisticated dual-timer system that provides both global timeout protection + * and per-step timeout diagnostics. + * + * The connection establishment follows these steps: + * 1. **TCP Connect**: Establish basic network connection + * 2. **TLS Handshake**: Negotiate SSL/TLS encryption + * 3. **HTTP Write**: Send peer handshake request + * 4. **HTTP Read**: Receive and validate peer response + * 5. **Complete**: Connection successfully established + * + * Uses a hybrid timeout approach: + * - **Global Timer**: Hard limit (20s) for entire connection process + * - **Step Timers**: Individual timeouts for each connection phase + * + * - All errors result in connection termination + * + * All operations are serialized using boost::asio::strand to ensure thread + * safety. The class is designed to be used exclusively within the ASIO event + * loop. + * + * @note This class should not be used directly. It is managed by OverlayImpl + * as part of the peer discovery and connection management system. + * + */ class ConnectAttempt : public OverlayImpl::Child, public std::enable_shared_from_this { private: using error_code = boost::system::error_code; - using endpoint_type = boost::asio::ip::tcp::endpoint; - using request_type = boost::beast::http::request; - using response_type = boost::beast::http::response; - using socket_type = boost::asio::ip::tcp::socket; using middle_type = boost::beast::tcp_stream; using stream_type = boost::beast::ssl_stream; using shared_context = std::shared_ptr; - static constexpr std::chrono::seconds connectTimeout{15}; + /** + * @enum ConnectionStep + * @brief Represents the current phase of the connection establishment + * process + * + * Used for tracking progress and providing detailed timeout diagnostics. + * Each step has its own timeout value defined in StepTimeouts. + */ + enum class ConnectionStep { + TcpConnect, // Establishing TCP connection to remote peer + TlsHandshake, // Performing SSL/TLS handshake + HttpWrite, // Sending HTTP upgrade request + HttpRead, // Reading HTTP upgrade response + Complete // Connection successfully established + }; + // Global timeout for entire connection process (hard limit) + static constexpr std::chrono::seconds connectTimeout{20}; + + /** + * @struct StepTimeouts + * @brief Defines timeout values for each connection step + * + * These timeouts are designed to detect slow individual phases while + * allowing the global timeout to enforce the overall time limit. + */ + struct StepTimeouts + { + static constexpr std::chrono::seconds tcpConnect{ + 8}; // TCP connection timeout + static constexpr std::chrono::seconds tlsHandshake{ + 5}; // SSL handshake timeout + static constexpr std::chrono::seconds httpWrite{ + 2}; // HTTP write timeout + static constexpr std::chrono::seconds httpRead{3}; // HTTP read timeout + }; + + // Core application and networking components Application& app_; - std::uint32_t const id_; + Peer::id_t const id_; beast::WrappedSink sink_; beast::Journal const journal_; endpoint_type remote_endpoint_; Resource::Consumer usage_; + boost::asio::strand strand_; boost::asio::basic_waitable_timer timer_; - std::unique_ptr stream_ptr_; + boost::asio::basic_waitable_timer stepTimer_; + + std::unique_ptr stream_ptr_; // SSL stream (owned) socket_type& socket_; stream_type& stream_; boost::beast::multi_buffer read_buf_; + response_type response_; std::shared_ptr slot_; request_type req_; - bool shutdown_ = false; - bool ioPending_ = false; - bool shutdownStarted_ = false; - bool handshakeComplete_ = false; + + bool shutdown_ = false; // Shutdown has been initiated + bool ioPending_ = false; // Async I/O operation in progress + bool shutdownStarted_ = false; // SSL shutdown in progress + bool handshakeComplete_ = false; // SSL handshake completed successfully + ConnectionStep currentStep_ = + ConnectionStep::TcpConnect; // Current connection phase public: + /** + * @brief Construct a new ConnectAttempt object + * + * @param app Application context providing configuration and services + * @param io_context ASIO I/O context for async operations + * @param remote_endpoint Target peer endpoint to connect to + * @param usage Resource usage tracker for rate limiting + * @param context Shared SSL context for encryption + * @param id Unique peer identifier for this connection attempt + * @param slot PeerFinder slot representing this connection + * @param journal Logging interface for diagnostics + * @param overlay Parent overlay manager + * + * @note The constructor only initializes the object. Call run() to begin + * the actual connection attempt. + */ ConnectAttempt( Application& app, boost::asio::io_context& io_context, endpoint_type const& remote_endpoint, Resource::Consumer usage, shared_context const& context, - std::uint32_t id, + Peer::id_t id, std::shared_ptr const& slot, beast::Journal journal, OverlayImpl& overlay); ~ConnectAttempt(); + /** + * @brief Stop the connection attempt + * + * This method is thread-safe and can be called from any thread. + */ void stop() override; + /** + * @brief Begin the connection attempt + * + * This method is thread-safe and posts to the strand if needed. + */ void run(); private: + /** + * @brief Set timers for the specified connection step + * + * @param step The connection step to set timers for + * + * Sets both the step-specific timer and the global timer (if not already + * set). + */ void - setTimer(); + setTimer(ConnectionStep step); + + /** + * @brief Cancel both global and step timers + * + * Used during cleanup and when connection completes successfully. + * Exceptions from timer cancellation are safely ignored. + */ void cancelTimer(); + + /** + * @brief Handle timer expiration events + * + * @param ec Error code from timer operation + * + * Determines which timer expired (global vs step) and logs appropriate + * diagnostic information before terminating the connection. + */ void onTimer(error_code ec); - void - onConnect(error_code ec); - void - onHandshake(error_code ec); - void - onWrite(error_code ec); - void - onRead(error_code ec); - void - fail(std::string const& reason); - void - fail(std::string const& name, error_code ec); - void - shutdown(); - void - tryAsyncShutdown(); - void - onShutdown(error_code ec); - void - close(); + // Connection phase handlers + void + onConnect(error_code ec); // TCP connection completion handler + void + onHandshake(error_code ec); // TLS handshake completion handler + void + onWrite(error_code ec); // HTTP write completion handler + void + onRead(error_code ec); // HTTP read completion handler + + // Error and cleanup handlers + void + fail(std::string const& reason); // Fail with custom reason + void + fail(std::string const& name, error_code ec); // Fail with system error + void + shutdown(); // Initiate graceful shutdown + void + tryAsyncShutdown(); // Attempt async SSL shutdown + void + onShutdown(error_code ec); // SSL shutdown completion handler + void + close(); // Force close socket + + /** + * @brief Process the HTTP upgrade response from peer + * + * Validates the peer's response, extracts protocol information, + * verifies handshake, and either creates a PeerImp or handles + * redirect responses. + */ void processResponse(); + static std::string + stepToString(ConnectionStep step) + { + switch (step) + { + case ConnectAttempt::ConnectionStep::TcpConnect: + return "TcpConnect"; + case ConnectAttempt::ConnectionStep::TlsHandshake: + return "TlsHandshake"; + case ConnectAttempt::ConnectionStep::HttpWrite: + return "HttpWrite"; + case ConnectAttempt::ConnectionStep::HttpRead: + return "HttpRead"; + case ConnectAttempt::ConnectionStep::Complete: + return "Complete"; + } + return "Unknown"; + }; + template static boost::asio::ip::tcp::endpoint parse_endpoint(std::string const& s, boost::system::error_code& ec) diff --git a/src/xrpld/overlay/detail/PeerImp.cpp b/src/xrpld/overlay/detail/PeerImp.cpp index 31e81cb9a9..09e5c1f7df 100644 --- a/src/xrpld/overlay/detail/PeerImp.cpp +++ b/src/xrpld/overlay/detail/PeerImp.cpp @@ -733,6 +733,10 @@ PeerImp::setTimer(std::chrono::seconds interval) void PeerImp::onTimer(error_code const& ec) { + XRPL_ASSERT( + strand_.running_in_this_thread(), + "ripple::PeerImp::onTimer : strand in this thread"); + if (!socket_.is_open()) return;