diff --git a/src/xrpld/overlay/detail/ConnectAttempt.cpp b/src/xrpld/overlay/detail/ConnectAttempt.cpp index 397ac06ba6..c1bc4bb069 100644 --- a/src/xrpld/overlay/detail/ConnectAttempt.cpp +++ b/src/xrpld/overlay/detail/ConnectAttempt.cpp @@ -24,6 +24,8 @@ #include +#include + namespace ripple { ConnectAttempt::ConnectAttempt( @@ -45,6 +47,7 @@ ConnectAttempt::ConnectAttempt( , usage_(usage) , strand_(boost::asio::make_strand(io_context)) , timer_(io_context) + , stepTimer_(io_context) , stream_ptr_(std::make_unique( socket_type(std::forward(io_context)), *context)) @@ -52,14 +55,14 @@ ConnectAttempt::ConnectAttempt( , stream_(*stream_ptr_) , slot_(slot) { - JLOG(journal_.debug()) << "Connect " << remote_endpoint; } ConnectAttempt::~ConnectAttempt() { + // slot_ will be null if we successfully connected + // and transferred ownership to a PeerImp if (slot_ != nullptr) overlay_.peerFinder().on_closed(slot_); - JLOG(journal_.trace()) << "~ConnectAttempt"; } void @@ -68,16 +71,29 @@ ConnectAttempt::stop() if (!strand_.running_in_this_thread()) return boost::asio::post( strand_, std::bind(&ConnectAttempt::stop, shared_from_this())); - if (socket_.is_open()) - { - JLOG(journal_.debug()) << "Stop"; - } - close(); + + if (!socket_.is_open()) + return; + + JLOG(journal_.debug()) << "stop: Stop"; + + shutdown(); } void ConnectAttempt::run() { + if (!strand_.running_in_this_thread()) + return boost::asio::post( + strand_, std::bind(&ConnectAttempt::run, shared_from_this())); + + JLOG(journal_.debug()) << "run: connecting to " << remote_endpoint_; + + ioPending_ = true; + + // Allow up to connectTimeout_ seconds to establish remote peer connection + setTimer(ConnectionStep::TcpConnect); + stream_.next_layer().async_connect( remote_endpoint_, boost::asio::bind_executor( @@ -90,61 +106,177 @@ ConnectAttempt::run() //------------------------------------------------------------------------------ +void +ConnectAttempt::shutdown() +{ + XRPL_ASSERT( + strand_.running_in_this_thread(), + "ripple::ConnectAttempt::shutdown: strand in this thread"); + + if (!socket_.is_open()) + return; + + shutdown_ = true; + boost::beast::get_lowest_layer(stream_).cancel(); + + tryAsyncShutdown(); +} + +void +ConnectAttempt::tryAsyncShutdown() +{ + XRPL_ASSERT( + strand_.running_in_this_thread(), + "ripple::ConnectAttempt::tryAsyncShutdown : strand in this thread"); + + if (!shutdown_ || currentStep_ == ConnectionStep::ShutdownStarted) + return; + + if (ioPending_) + return; + + // gracefully shutdown the SSL socket, performing a shutdown handshake + if (currentStep_ != ConnectionStep::TcpConnect && + currentStep_ != ConnectionStep::TlsHandshake) + { + setTimer(ConnectionStep::ShutdownStarted); + return stream_.async_shutdown(bind_executor( + strand_, + std::bind( + &ConnectAttempt::onShutdown, + shared_from_this(), + std::placeholders::_1))); + } + + close(); +} + +void +ConnectAttempt::onShutdown(error_code ec) +{ + cancelTimer(); + + if (ec) + { + // - eof: the stream was cleanly closed + // - operation_aborted: an expired timer (slow shutdown) + // - stream_truncated: the tcp connection closed (no handshake) it could + // occur if a peer does not perform a graceful disconnect + // - broken_pipe: the peer is gone + // - application data after close notify: benign SSL shutdown condition + bool shouldLog = + (ec != boost::asio::error::eof && + ec != boost::asio::error::operation_aborted && + ec.message().find("application data after close notify") == + std::string::npos); + + if (shouldLog) + { + JLOG(journal_.debug()) << "onShutdown: " << ec.message(); + } + } + + close(); +} + void ConnectAttempt::close() { XRPL_ASSERT( strand_.running_in_this_thread(), "ripple::ConnectAttempt::close : strand in this thread"); - if (socket_.is_open()) - { - try - { - timer_.cancel(); - socket_.close(); - } - catch (boost::system::system_error const&) - { - // ignored - } + if (!socket_.is_open()) + return; - JLOG(journal_.debug()) << "Closed"; - } + cancelTimer(); + + error_code ec; + socket_.close(ec); } void ConnectAttempt::fail(std::string const& reason) { JLOG(journal_.debug()) << reason; - close(); + shutdown(); } void ConnectAttempt::fail(std::string const& name, error_code ec) { JLOG(journal_.debug()) << name << ": " << ec.message(); - close(); + shutdown(); } void -ConnectAttempt::setTimer() +ConnectAttempt::setTimer(ConnectionStep step) { - try + currentStep_ = step; + + // Set global timer (only if not already set) + if (timer_.expiry() == std::chrono::steady_clock::time_point{}) { - timer_.expires_after(std::chrono::seconds(15)); - } - catch (boost::system::system_error const& e) - { - JLOG(journal_.error()) << "setTimer: " << e.code(); - return; + try + { + timer_.expires_after(connectTimeout); + timer_.async_wait(boost::asio::bind_executor( + strand_, + std::bind( + &ConnectAttempt::onTimer, + shared_from_this(), + std::placeholders::_1))); + } + catch (std::exception const& ex) + { + JLOG(journal_.error()) << "setTimer (global): " << ex.what(); + return close(); + } } - timer_.async_wait(boost::asio::bind_executor( - strand_, - std::bind( - &ConnectAttempt::onTimer, - shared_from_this(), - std::placeholders::_1))); + // Set step-specific timer + try + { + std::chrono::seconds stepTimeout; + switch (step) + { + case ConnectionStep::TcpConnect: + stepTimeout = StepTimeouts::tcpConnect; + break; + case ConnectionStep::TlsHandshake: + stepTimeout = StepTimeouts::tlsHandshake; + break; + case ConnectionStep::HttpWrite: + stepTimeout = StepTimeouts::httpWrite; + break; + case ConnectionStep::HttpRead: + stepTimeout = StepTimeouts::httpRead; + break; + case ConnectionStep::ShutdownStarted: + stepTimeout = StepTimeouts::tlsShutdown; + break; + case ConnectionStep::Complete: + case ConnectionStep::Init: + return; // No timer needed for init or complete step + } + + // call to expires_after cancels previous timer + stepTimer_.expires_after(stepTimeout); + stepTimer_.async_wait(boost::asio::bind_executor( + strand_, + std::bind( + &ConnectAttempt::onTimer, + shared_from_this(), + std::placeholders::_1))); + + JLOG(journal_.trace()) << "setTimer: " << stepToString(step) + << " timeout=" << stepTimeout.count() << "s"; + } + catch (std::exception const& ex) + { + JLOG(journal_.error()) + << "setTimer (step " << stepToString(step) << "): " << ex.what(); + return close(); + } } void @@ -153,6 +285,7 @@ ConnectAttempt::cancelTimer() try { timer_.cancel(); + stepTimer_.cancel(); } catch (boost::system::system_error const&) { @@ -165,34 +298,69 @@ ConnectAttempt::onTimer(error_code ec) { if (!socket_.is_open()) return; - if (ec == boost::asio::error::operation_aborted) - return; + if (ec) { + // do not initiate shutdown, timers are frequently cancelled + if (ec == boost::asio::error::operation_aborted) + return; + // This should never happen JLOG(journal_.error()) << "onTimer: " << ec.message(); return close(); } - fail("Timeout"); + + // Determine which timer expired by checking their expiry times + auto const now = std::chrono::steady_clock::now(); + bool globalExpired = (timer_.expiry() <= now); + bool stepExpired = (stepTimer_.expiry() <= now); + + if (globalExpired) + { + JLOG(journal_.debug()) + << "onTimer: Global timeout; step: " << stepToString(currentStep_); + } + else if (stepExpired) + { + JLOG(journal_.debug()) + << "onTimer: Step timeout; step: " << stepToString(currentStep_); + } + else + { + JLOG(journal_.warn()) << "onTimer: Unexpected timer callback"; + } + + close(); } void ConnectAttempt::onConnect(error_code ec) { - cancelTimer(); + ioPending_ = false; - if (ec == boost::asio::error::operation_aborted) - return; - endpoint_type local_endpoint; - if (!ec) - local_endpoint = socket_.local_endpoint(ec); if (ec) + { + if (ec == boost::asio::error::operation_aborted) + return tryAsyncShutdown(); + return fail("onConnect", ec); + } + if (!socket_.is_open()) return; - JLOG(journal_.trace()) << "onConnect"; - setTimer(); + // check if connection has really been established + socket_.local_endpoint(ec); + if (ec) + return fail("onConnect", ec); + + if (shutdown_) + return tryAsyncShutdown(); + + ioPending_ = true; + + setTimer(ConnectionStep::TlsHandshake); + stream_.set_verify_mode(boost::asio::ssl::verify_none); stream_.async_handshake( boost::asio::ssl::stream_base::client, @@ -207,25 +375,30 @@ ConnectAttempt::onConnect(error_code ec) void ConnectAttempt::onHandshake(error_code ec) { - cancelTimer(); - if (!socket_.is_open()) - return; - if (ec == boost::asio::error::operation_aborted) - return; - endpoint_type local_endpoint; - if (!ec) - local_endpoint = socket_.local_endpoint(ec); + ioPending_ = false; + + if (ec) + { + if (ec == boost::asio::error::operation_aborted) + return tryAsyncShutdown(); + + return fail("onHandshake", ec); + } + + auto const local_endpoint = socket_.local_endpoint(ec); if (ec) return fail("onHandshake", ec); - JLOG(journal_.trace()) << "onHandshake"; + setTimer(ConnectionStep::HttpWrite); + + // check if we connected to ourselves if (!overlay_.peerFinder().onConnected( slot_, beast::IPAddressConversion::from_asio(local_endpoint))) - return fail("Duplicate connection"); + return fail("Self connection"); auto const sharedValue = makeSharedValue(*stream_ptr_, journal_); if (!sharedValue) - return close(); // makeSharedValue logs + return shutdown(); // makeSharedValue logs req_ = makeRequest( !overlay_.peerFinder().config().peerPrivate, @@ -242,7 +415,11 @@ ConnectAttempt::onHandshake(error_code ec) remote_endpoint_.address(), app_); - setTimer(); + if (shutdown_) + return tryAsyncShutdown(); + + ioPending_ = true; + boost::beast::http::async_write( stream_, req_, @@ -257,13 +434,23 @@ ConnectAttempt::onHandshake(error_code ec) void ConnectAttempt::onWrite(error_code ec) { - cancelTimer(); - if (!socket_.is_open()) - return; - if (ec == boost::asio::error::operation_aborted) - return; + ioPending_ = false; + if (ec) + { + if (ec == boost::asio::error::operation_aborted) + return tryAsyncShutdown(); + return fail("onWrite", ec); + } + + if (shutdown_) + return tryAsyncShutdown(); + + ioPending_ = true; + + setTimer(ConnectionStep::HttpRead); + boost::beast::http::async_read( stream_, read_buf_, @@ -280,39 +467,27 @@ void ConnectAttempt::onRead(error_code ec) { cancelTimer(); + ioPending_ = false; + currentStep_ = ConnectionStep::Complete; - if (!socket_.is_open()) - return; - if (ec == boost::asio::error::operation_aborted) - return; - if (ec == boost::asio::error::eof) - { - JLOG(journal_.info()) << "EOF"; - setTimer(); - return stream_.async_shutdown(boost::asio::bind_executor( - strand_, - std::bind( - &ConnectAttempt::onShutdown, - shared_from_this(), - std::placeholders::_1))); - } if (ec) - return fail("onRead", ec); - processResponse(); -} - -void -ConnectAttempt::onShutdown(error_code ec) -{ - cancelTimer(); - if (!ec) { - JLOG(journal_.error()) << "onShutdown: expected error condition"; - return close(); + if (ec == boost::asio::error::eof) + { + JLOG(journal_.debug()) << "EOF"; + return shutdown(); + } + + if (ec == boost::asio::error::operation_aborted) + return tryAsyncShutdown(); + + return fail("onRead", ec); } - if (ec != boost::asio::error::eof) - return fail("onShutdown", ec); - close(); + + if (shutdown_) + return tryAsyncShutdown(); + + processResponse(); } //-------------------------------------------------------------------------- @@ -320,48 +495,69 @@ ConnectAttempt::onShutdown(error_code ec) void ConnectAttempt::processResponse() { - if (response_.result() == boost::beast::http::status::service_unavailable) - { - Json::Value json; - Json::Reader r; - std::string s; - s.reserve(boost::asio::buffer_size(response_.body().data())); - for (auto const buffer : response_.body().data()) - s.append( - static_cast(buffer.data()), - boost::asio::buffer_size(buffer)); - auto const success = r.parse(s, json); - if (success) - { - if (json.isObject() && json.isMember("peer-ips")) - { - Json::Value const& ips = json["peer-ips"]; - if (ips.isArray()) - { - std::vector eps; - eps.reserve(ips.size()); - for (auto const& v : ips) - { - if (v.isString()) - { - error_code ec; - auto const ep = parse_endpoint(v.asString(), ec); - if (!ec) - eps.push_back(ep); - } - } - overlay_.peerFinder().onRedirects(remote_endpoint_, eps); - } - } - } - } - if (!OverlayImpl::isPeerUpgrade(response_)) { - JLOG(journal_.info()) - << "Unable to upgrade to peer protocol: " << response_.result() - << " (" << response_.reason() << ")"; - return close(); + // A peer may respond with service_unavailable and a list of alternative + // peers to connect to, a differing status code is unexpected + if (response_.result() != + boost::beast::http::status::service_unavailable) + { + JLOG(journal_.warn()) + << "Unable to upgrade to peer protocol: " << response_.result() + << " (" << response_.reason() << ")"; + return shutdown(); + } + + // Parse response body to determine if this is a redirect or other + // service unavailable + std::string responseBody; + responseBody.reserve(boost::asio::buffer_size(response_.body().data())); + for (auto const buffer : response_.body().data()) + responseBody.append( + static_cast(buffer.data()), + boost::asio::buffer_size(buffer)); + + Json::Value json; + Json::Reader reader; + auto const isValidJson = reader.parse(responseBody, json); + + // Check if this is a redirect response (contains peer-ips field) + auto const isRedirect = + isValidJson && json.isObject() && json.isMember("peer-ips"); + + if (!isRedirect) + { + JLOG(journal_.warn()) + << "processResponse: " << remote_endpoint_ + << " failed to upgrade to peer protocol: " << response_.result() + << " (" << response_.reason() << ")"; + + return shutdown(); + } + + Json::Value const& peerIps = json["peer-ips"]; + if (!peerIps.isArray()) + return fail("processResponse: invalid peer-ips format"); + + // Extract and validate peer endpoints + std::vector redirectEndpoints; + redirectEndpoints.reserve(peerIps.size()); + + for (auto const& ipValue : peerIps) + { + if (!ipValue.isString()) + continue; + + error_code ec; + auto const endpoint = parse_endpoint(ipValue.asString(), ec); + if (!ec) + redirectEndpoints.push_back(endpoint); + } + + // Notify PeerFinder about the redirect redirectEndpoints may be empty + overlay_.peerFinder().onRedirects(remote_endpoint_, redirectEndpoints); + + return fail("processResponse: failed to connect to peer: redirected"); } // Just because our peer selected a particular protocol version doesn't @@ -381,11 +577,11 @@ ConnectAttempt::processResponse() auto const sharedValue = makeSharedValue(*stream_ptr_, journal_); if (!sharedValue) - return close(); // makeSharedValue logs + return shutdown(); // makeSharedValue logs try { - auto publicKey = verifyHandshake( + auto const publicKey = verifyHandshake( response_, *sharedValue, overlay_.setup().networkID, @@ -393,11 +589,10 @@ ConnectAttempt::processResponse() remote_endpoint_.address(), app_); - JLOG(journal_.info()) - << "Public Key: " << toBase58(TokenType::NodePublic, publicKey); - JLOG(journal_.debug()) << "Protocol: " << to_string(*negotiatedProtocol); + JLOG(journal_.info()) + << "Public Key: " << toBase58(TokenType::NodePublic, publicKey); auto const member = app_.cluster().member(publicKey); if (member) @@ -405,10 +600,21 @@ ConnectAttempt::processResponse() JLOG(journal_.info()) << "Cluster name: " << *member; } - auto const result = overlay_.peerFinder().activate( - slot_, publicKey, static_cast(member)); + auto const result = + overlay_.peerFinder().activate(slot_, publicKey, !member->empty()); if (result != PeerFinder::Result::success) - return fail("Outbound " + std::string(to_string(result))); + { + std::stringstream ss; + ss << "Outbound Connect Attempt " << remote_endpoint_ << " " + << to_string(result); + return fail(ss.str()); + } + + if (!socket_.is_open()) + return; + + if (shutdown_) + return tryAsyncShutdown(); auto const peer = std::make_shared( app_, diff --git a/src/xrpld/overlay/detail/ConnectAttempt.h b/src/xrpld/overlay/detail/ConnectAttempt.h index febbe88f45..38b9482d9d 100644 --- a/src/xrpld/overlay/detail/ConnectAttempt.h +++ b/src/xrpld/overlay/detail/ConnectAttempt.h @@ -22,90 +22,258 @@ #include +#include + namespace ripple { -/** Manages an outbound connection attempt. */ +/** + * @class ConnectAttempt + * @brief Manages outbound peer connection attempts with comprehensive timeout + * handling + * + * The ConnectAttempt class handles the complete lifecycle of establishing an + * outbound connection to a peer in the XRPL network. It implements a + * sophisticated dual-timer system that provides both global timeout protection + * and per-step timeout diagnostics. + * + * The connection establishment follows these steps: + * 1. **TCP Connect**: Establish basic network connection + * 2. **TLS Handshake**: Negotiate SSL/TLS encryption + * 3. **HTTP Write**: Send peer handshake request + * 4. **HTTP Read**: Receive and validate peer response + * 5. **Complete**: Connection successfully established + * + * Uses a hybrid timeout approach: + * - **Global Timer**: Hard limit (20s) for entire connection process + * - **Step Timers**: Individual timeouts for each connection phase + * + * - All errors result in connection termination + * + * All operations are serialized using boost::asio::strand to ensure thread + * safety. The class is designed to be used exclusively within the ASIO event + * loop. + * + * @note This class should not be used directly. It is managed by OverlayImpl + * as part of the peer discovery and connection management system. + * + */ class ConnectAttempt : public OverlayImpl::Child, public std::enable_shared_from_this { private: using error_code = boost::system::error_code; - using endpoint_type = boost::asio::ip::tcp::endpoint; - using request_type = boost::beast::http::request; - using response_type = boost::beast::http::response; - using socket_type = boost::asio::ip::tcp::socket; using middle_type = boost::beast::tcp_stream; using stream_type = boost::beast::ssl_stream; using shared_context = std::shared_ptr; + /** + * @enum ConnectionStep + * @brief Represents the current phase of the connection establishment + * process + * + * Used for tracking progress and providing detailed timeout diagnostics. + * Each step has its own timeout value defined in StepTimeouts. + */ + enum class ConnectionStep { + Init, // Initial state, nothing started + TcpConnect, // Establishing TCP connection to remote peer + TlsHandshake, // Performing SSL/TLS handshake + HttpWrite, // Sending HTTP upgrade request + HttpRead, // Reading HTTP upgrade response + Complete, // Connection successfully established + ShutdownStarted // Connection shutdown has started + }; + + // A timeout for connection process, greater than all step timeouts + static constexpr std::chrono::seconds connectTimeout{25}; + + /** + * @struct StepTimeouts + * @brief Defines timeout values for each connection step + * + * These timeouts are designed to detect slow individual phases while + * allowing the global timeout to enforce the overall time limit. + */ + struct StepTimeouts + { + // TCP connection timeout + static constexpr std::chrono::seconds tcpConnect{8}; + // SSL handshake timeout + static constexpr std::chrono::seconds tlsHandshake{8}; + // HTTP write timeout + static constexpr std::chrono::seconds httpWrite{3}; + // HTTP read timeout + static constexpr std::chrono::seconds httpRead{3}; + // SSL shutdown timeout + static constexpr std::chrono::seconds tlsShutdown{2}; + }; + + // Core application and networking components Application& app_; - std::uint32_t const id_; + Peer::id_t const id_; beast::WrappedSink sink_; beast::Journal const journal_; endpoint_type remote_endpoint_; Resource::Consumer usage_; + boost::asio::strand strand_; boost::asio::basic_waitable_timer timer_; - std::unique_ptr stream_ptr_; + boost::asio::basic_waitable_timer stepTimer_; + + std::unique_ptr stream_ptr_; // SSL stream (owned) socket_type& socket_; stream_type& stream_; boost::beast::multi_buffer read_buf_; + response_type response_; std::shared_ptr slot_; request_type req_; + bool shutdown_ = false; // Shutdown has been initiated + bool ioPending_ = false; // Async I/O operation in progress + ConnectionStep currentStep_ = ConnectionStep::Init; + public: + /** + * @brief Construct a new ConnectAttempt object + * + * @param app Application context providing configuration and services + * @param io_context ASIO I/O context for async operations + * @param remote_endpoint Target peer endpoint to connect to + * @param usage Resource usage tracker for rate limiting + * @param context Shared SSL context for encryption + * @param id Unique peer identifier for this connection attempt + * @param slot PeerFinder slot representing this connection + * @param journal Logging interface for diagnostics + * @param overlay Parent overlay manager + * + * @note The constructor only initializes the object. Call run() to begin + * the actual connection attempt. + */ ConnectAttempt( Application& app, boost::asio::io_context& io_context, endpoint_type const& remote_endpoint, Resource::Consumer usage, shared_context const& context, - std::uint32_t id, + Peer::id_t id, std::shared_ptr const& slot, beast::Journal journal, OverlayImpl& overlay); ~ConnectAttempt(); + /** + * @brief Stop the connection attempt + * + * This method is thread-safe and can be called from any thread. + */ void stop() override; + /** + * @brief Begin the connection attempt + * + * This method is thread-safe and posts to the strand if needed. + */ void run(); private: + /** + * @brief Set timers for the specified connection step + * + * @param step The connection step to set timers for + * + * Sets both the step-specific timer and the global timer (if not already + * set). + */ void - close(); - void - fail(std::string const& reason); - void - fail(std::string const& name, error_code ec); - void - setTimer(); + setTimer(ConnectionStep step); + + /** + * @brief Cancel both global and step timers + * + * Used during cleanup and when connection completes successfully. + * Exceptions from timer cancellation are safely ignored. + */ void cancelTimer(); + + /** + * @brief Handle timer expiration events + * + * @param ec Error code from timer operation + * + * Determines which timer expired (global vs step) and logs appropriate + * diagnostic information before terminating the connection. + */ void onTimer(error_code ec); + + // Connection phase handlers void - onConnect(error_code ec); + onConnect(error_code ec); // TCP connection completion handler void - onHandshake(error_code ec); + onHandshake(error_code ec); // TLS handshake completion handler void - onWrite(error_code ec); + onWrite(error_code ec); // HTTP write completion handler void - onRead(error_code ec); + onRead(error_code ec); // HTTP read completion handler + + // Error and cleanup handlers void - onShutdown(error_code ec); + fail(std::string const& reason); // Fail with custom reason + void + fail(std::string const& name, error_code ec); // Fail with system error + void + shutdown(); // Initiate graceful shutdown + void + tryAsyncShutdown(); // Attempt async SSL shutdown + void + onShutdown(error_code ec); // SSL shutdown completion handler + void + close(); // Force close socket + + /** + * @brief Process the HTTP upgrade response from peer + * + * Validates the peer's response, extracts protocol information, + * verifies handshake, and either creates a PeerImp or handles + * redirect responses. + */ void processResponse(); + static std::string + stepToString(ConnectionStep step) + { + switch (step) + { + case ConnectionStep::Init: + return "Init"; + case ConnectionStep::TcpConnect: + return "TcpConnect"; + case ConnectionStep::TlsHandshake: + return "TlsHandshake"; + case ConnectionStep::HttpWrite: + return "HttpWrite"; + case ConnectionStep::HttpRead: + return "HttpRead"; + case ConnectionStep::Complete: + return "Complete"; + case ConnectionStep::ShutdownStarted: + return "ShutdownStarted"; + } + return "Unknown"; + }; + template static boost::asio::ip::tcp::endpoint parse_endpoint(std::string const& s, boost::system::error_code& ec) diff --git a/src/xrpld/overlay/detail/PeerImp.cpp b/src/xrpld/overlay/detail/PeerImp.cpp index 2cd9432eb8..93371f42ab 100644 --- a/src/xrpld/overlay/detail/PeerImp.cpp +++ b/src/xrpld/overlay/detail/PeerImp.cpp @@ -44,6 +44,7 @@ #include #include +#include #include #include #include @@ -59,6 +60,10 @@ std::chrono::milliseconds constexpr peerHighLatency{300}; /** How often we PING the peer to check for latency and sendq probe */ std::chrono::seconds constexpr peerTimerInterval{60}; + +/** The timeout for a shutdown timer */ +std::chrono::seconds constexpr shutdownTimerInterval{5}; + } // namespace // TODO: Remove this exclusion once unit tests are added after the hotfix @@ -215,23 +220,17 @@ PeerImp::stop() { if (!strand_.running_in_this_thread()) return post(strand_, std::bind(&PeerImp::stop, shared_from_this())); - if (socket_.is_open()) - { - // The rationale for using different severity levels is that - // outbound connections are under our control and may be logged - // at a higher level, but inbound connections are more numerous and - // uncontrolled so to prevent log flooding the severity is reduced. - // - if (inbound_) - { - JLOG(journal_.debug()) << "Stop"; - } - else - { - JLOG(journal_.info()) << "Stop"; - } - } - close(); + + if (!socket_.is_open()) + return; + + // The rationale for using different severity levels is that + // outbound connections are under our control and may be logged + // at a higher level, but inbound connections are more numerous and + // uncontrolled so to prevent log flooding the severity is reduced. + JLOG(journal_.debug()) << "stop: Stop"; + + shutdown(); } //------------------------------------------------------------------------------ @@ -241,11 +240,14 @@ PeerImp::send(std::shared_ptr const& m) { if (!strand_.running_in_this_thread()) return post(strand_, std::bind(&PeerImp::send, shared_from_this(), m)); - if (gracefulClose_) - return; - if (detaching_) + + if (!socket_.is_open()) return; + // we are in progress of closing the connection + if (shutdown_) + return tryAsyncShutdown(); + auto validator = m->getValidatorKey(); if (validator && !squelch_.expireSquelch(*validator)) { @@ -287,6 +289,7 @@ PeerImp::send(std::shared_ptr const& m) if (sendq_size != 0) return; + writePending_ = true; boost::asio::async_write( stream_, boost::asio::buffer( @@ -573,34 +576,21 @@ PeerImp::hasRange(std::uint32_t uMin, std::uint32_t uMax) //------------------------------------------------------------------------------ void -PeerImp::close() +PeerImp::fail(std::string const& name, error_code ec) { XRPL_ASSERT( strand_.running_in_this_thread(), - "ripple::PeerImp::close : strand in this thread"); - if (socket_.is_open()) - { - detaching_ = true; // DEPRECATED - try - { - timer_.cancel(); - socket_.close(); - } - catch (boost::system::system_error const&) - { - // ignored - } + "ripple::PeerImp::fail : strand in this thread"); - overlay_.incPeerDisconnect(); - if (inbound_) - { - JLOG(journal_.debug()) << "Closed"; - } - else - { - JLOG(journal_.info()) << "Closed"; - } - } + if (!socket_.is_open()) + return; + + JLOG(journal_.warn()) << name << " from " + << toBase58(TokenType::NodePublic, publicKey_) + << " at " << remote_address_.to_string() << ": " + << ec.message(); + + shutdown(); } void @@ -613,45 +603,39 @@ PeerImp::fail(std::string const& reason) (void(Peer::*)(std::string const&)) & PeerImp::fail, shared_from_this(), reason)); - if (journal_.active(beast::severities::kWarning) && socket_.is_open()) + + if (!socket_.is_open()) + return; + + // Call to name() locks, log only if the message will be outputed + if (journal_.active(beast::severities::kWarning)) { std::string const n = name(); JLOG(journal_.warn()) << (n.empty() ? remote_address_.to_string() : n) << " failed: " << reason; } - close(); + + shutdown(); } void -PeerImp::fail(std::string const& name, error_code ec) +PeerImp::tryAsyncShutdown() { XRPL_ASSERT( strand_.running_in_this_thread(), - "ripple::PeerImp::fail : strand in this thread"); - if (socket_.is_open()) - { - JLOG(journal_.warn()) - << name << " from " << toBase58(TokenType::NodePublic, publicKey_) - << " at " << remote_address_.to_string() << ": " << ec.message(); - } - close(); -} + "ripple::PeerImp::tryAsyncShutdown : strand in this thread"); -void -PeerImp::gracefulClose() -{ - XRPL_ASSERT( - strand_.running_in_this_thread(), - "ripple::PeerImp::gracefulClose : strand in this thread"); - XRPL_ASSERT( - socket_.is_open(), "ripple::PeerImp::gracefulClose : socket is open"); - XRPL_ASSERT( - !gracefulClose_, - "ripple::PeerImp::gracefulClose : socket is not closing"); - gracefulClose_ = true; - if (send_queue_.size() > 0) + if (!shutdown_ || shutdownStarted_) return; - setTimer(); + + if (readPending_ || writePending_) + return; + + shutdownStarted_ = true; + + setTimer(shutdownTimerInterval); + + // gracefully shutdown the SSL socket, performing a shutdown handshake stream_.async_shutdown(bind_executor( strand_, std::bind( @@ -659,69 +643,125 @@ PeerImp::gracefulClose() } void -PeerImp::setTimer() +PeerImp::shutdown() +{ + XRPL_ASSERT( + strand_.running_in_this_thread(), + "ripple::PeerImp::shutdown: strand in this thread"); + + if (!socket_.is_open() || shutdown_) + return; + + shutdown_ = true; + + boost::beast::get_lowest_layer(stream_).cancel(); + + tryAsyncShutdown(); +} + +void +PeerImp::onShutdown(error_code ec) +{ + cancelTimer(); + if (ec) + { + // - eof: the stream was cleanly closed + // - operation_aborted: an expired timer (slow shutdown) + // - stream_truncated: the tcp connection closed (no handshake) it could + // occur if a peer does not perform a graceful disconnect + // - broken_pipe: the peer is gone + bool shouldLog = + (ec != boost::asio::error::eof && + ec != boost::asio::error::operation_aborted && + ec.message().find("application data after close notify") == + std::string::npos); + + if (shouldLog) + { + JLOG(journal_.debug()) << "onShutdown: " << ec.message(); + } + } + + close(); +} + +void +PeerImp::close() +{ + XRPL_ASSERT( + strand_.running_in_this_thread(), + "ripple::PeerImp::close : strand in this thread"); + + if (!socket_.is_open()) + return; + + cancelTimer(); + + error_code ec; + socket_.close(ec); + + overlay_.incPeerDisconnect(); + + // The rationale for using different severity levels is that + // outbound connections are under our control and may be logged + // at a higher level, but inbound connections are more numerous and + // uncontrolled so to prevent log flooding the severity is reduced. + JLOG((inbound_ ? journal_.debug() : journal_.info())) << "close: Closed"; +} + +//------------------------------------------------------------------------------ + +void +PeerImp::setTimer(std::chrono::seconds interval) { try { - timer_.expires_after(peerTimerInterval); + timer_.expires_after(interval); } - catch (boost::system::system_error const& e) + catch (std::exception const& ex) { - JLOG(journal_.error()) << "setTimer: " << e.code(); - return; + JLOG(journal_.error()) << "setTimer: " << ex.what(); + return shutdown(); } + timer_.async_wait(bind_executor( strand_, std::bind( &PeerImp::onTimer, shared_from_this(), std::placeholders::_1))); } -// convenience for ignoring the error code -void -PeerImp::cancelTimer() -{ - try - { - timer_.cancel(); - } - catch (boost::system::system_error const&) - { - // ignored - } -} - -//------------------------------------------------------------------------------ - -std::string -PeerImp::makePrefix(id_t id) -{ - std::stringstream ss; - ss << "[" << std::setfill('0') << std::setw(3) << id << "] "; - return ss.str(); -} - void PeerImp::onTimer(error_code const& ec) { - if (!socket_.is_open()) - return; + XRPL_ASSERT( + strand_.running_in_this_thread(), + "ripple::PeerImp::onTimer : strand in this thread"); - if (ec == boost::asio::error::operation_aborted) + if (!socket_.is_open()) return; if (ec) { + // do not initiate shutdown, timers are frequently cancelled + if (ec == boost::asio::error::operation_aborted) + return; + // This should never happen JLOG(journal_.error()) << "onTimer: " << ec.message(); return close(); } - if (large_sendq_++ >= Tuning::sendqIntervals) + // the timer expired before the shutdown completed + // force close the connection + if (shutdown_) { - fail("Large send queue"); - return; + JLOG(journal_.debug()) << "onTimer: shutdown timer expired"; + return close(); } + if (large_sendq_++ >= Tuning::sendqIntervals) + return fail("Large send queue"); + if (auto const t = tracking_.load(); !inbound_ && t != Tracking::converged) { clock_type::duration duration; @@ -737,17 +777,13 @@ PeerImp::onTimer(error_code const& ec) (duration > app_.config().MAX_UNKNOWN_TIME))) { overlay_.peerFinder().on_failure(slot_); - fail("Not useful"); - return; + return fail("Not useful"); } } // Already waiting for PONG if (lastPingSeq_) - { - fail("Ping Timeout"); - return; - } + return fail("Ping Timeout"); lastPingTime_ = clock_type::now(); lastPingSeq_ = rand_int(); @@ -758,22 +794,28 @@ PeerImp::onTimer(error_code const& ec) send(std::make_shared(message, protocol::mtPING)); - setTimer(); + setTimer(peerTimerInterval); } void -PeerImp::onShutdown(error_code ec) +PeerImp::cancelTimer() noexcept { - cancelTimer(); - // If we don't get eof then something went wrong - if (!ec) + try { - JLOG(journal_.error()) << "onShutdown: expected error condition"; - return close(); + timer_.cancel(); } - if (ec != boost::asio::error::eof) - return fail("onShutdown", ec); - close(); + catch (std::exception const& ex) + { + JLOG(journal_.error()) << "cancelTimer: " << ex.what(); + } +} + +std::string +PeerImp::makePrefix(id_t id) +{ + std::stringstream ss; + ss << "[" << std::setfill('0') << std::setw(3) << id << "] "; + return ss.str(); } //------------------------------------------------------------------------------ @@ -786,6 +828,10 @@ PeerImp::doAccept() JLOG(journal_.debug()) << "doAccept: " << remote_address_; + // a shutdown was initiated before the handshake, there is nothing to do + if (shutdown_) + return tryAsyncShutdown(); + auto const sharedValue = makeSharedValue(*stream_ptr_, journal_); // This shouldn't fail since we already computed @@ -793,7 +839,7 @@ PeerImp::doAccept() if (!sharedValue) return fail("makeSharedValue: Unexpected failure"); - JLOG(journal_.info()) << "Protocol: " << to_string(protocol_); + JLOG(journal_.debug()) << "Protocol: " << to_string(protocol_); JLOG(journal_.info()) << "Public Key: " << toBase58(TokenType::NodePublic, publicKey_); @@ -836,7 +882,7 @@ PeerImp::doAccept() if (!socket_.is_open()) return; if (ec == boost::asio::error::operation_aborted) - return; + return tryAsyncShutdown(); if (ec) return fail("onWriteResponse", ec); if (write_buffer->size() == bytes_transferred) @@ -865,6 +911,10 @@ PeerImp::domain() const void PeerImp::doProtocolStart() { + // a shutdown was initiated before the handshare, there is nothing to do + if (shutdown_) + return tryAsyncShutdown(); + onReadMessage(error_code(), 0); // Send all the validator lists that have been loaded @@ -896,30 +946,45 @@ PeerImp::doProtocolStart() if (auto m = overlay_.getManifestsMessage()) send(m); - setTimer(); + setTimer(peerTimerInterval); } // Called repeatedly with protocol message data void PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred) { + XRPL_ASSERT( + strand_.running_in_this_thread(), + "ripple::PeerImp::onReadMessage : strand in this thread"); + + readPending_ = false; + if (!socket_.is_open()) return; - if (ec == boost::asio::error::operation_aborted) - return; - if (ec == boost::asio::error::eof) - { - JLOG(journal_.info()) << "EOF"; - return gracefulClose(); - } + if (ec) + { + if (ec == boost::asio::error::eof) + { + JLOG(journal_.debug()) << "EOF"; + return shutdown(); + } + + if (ec == boost::asio::error::operation_aborted) + return tryAsyncShutdown(); + return fail("onReadMessage", ec); + } + // we started shutdown, no reason to process further data + if (shutdown_) + return tryAsyncShutdown(); + if (auto stream = journal_.trace()) { - if (bytes_transferred > 0) - stream << "onReadMessage: " << bytes_transferred << " bytes"; - else - stream << "onReadMessage"; + stream << "onReadMessage: " + << (bytes_transferred > 0 + ? to_string(bytes_transferred) + " bytes" + : ""); } metrics_.recv.add_message(bytes_transferred); @@ -941,17 +1006,29 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred) 350ms, journal_); - if (ec) - return fail("onReadMessage", ec); if (!socket_.is_open()) return; - if (gracefulClose_) - return; + + // the error_code is produced by invokeProtocolMessage + // it could be due to a bad message + if (ec) + return fail("onReadMessage", ec); + if (bytes_consumed == 0) break; + read_buffer_.consume(bytes_consumed); } + // check if a shutdown was initiated while processing messages + if (shutdown_) + return tryAsyncShutdown(); + + readPending_ = true; + + XRPL_ASSERT( + !shutdownStarted_, "ripple::PeerImp::onReadMessage : shutdown started"); + // Timeout on writes only stream_.async_read_some( read_buffer_.prepare(std::max(Tuning::readBufferBytes, hint)), @@ -967,18 +1044,29 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred) void PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred) { + XRPL_ASSERT( + strand_.running_in_this_thread(), + "ripple::PeerImp::onWriteMessage : strand in this thread"); + + writePending_ = false; + if (!socket_.is_open()) return; - if (ec == boost::asio::error::operation_aborted) - return; + if (ec) + { + if (ec == boost::asio::error::operation_aborted) + return tryAsyncShutdown(); + return fail("onWriteMessage", ec); + } + if (auto stream = journal_.trace()) { - if (bytes_transferred > 0) - stream << "onWriteMessage: " << bytes_transferred << " bytes"; - else - stream << "onWriteMessage"; + stream << "onWriteMessage: " + << (bytes_transferred > 0 + ? to_string(bytes_transferred) + " bytes" + : ""); } metrics_.sent.add_message(bytes_transferred); @@ -987,8 +1075,17 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred) !send_queue_.empty(), "ripple::PeerImp::onWriteMessage : non-empty send buffer"); send_queue_.pop(); + + if (shutdown_) + return tryAsyncShutdown(); + if (!send_queue_.empty()) { + writePending_ = true; + XRPL_ASSERT( + !shutdownStarted_, + "ripple::PeerImp::onWriteMessage : shutdown started"); + // Timeout on writes only return boost::asio::async_write( stream_, @@ -1002,16 +1099,6 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred) std::placeholders::_1, std::placeholders::_2))); } - - if (gracefulClose_) - { - return stream_.async_shutdown(bind_executor( - strand_, - std::bind( - &PeerImp::onShutdown, - shared_from_this(), - std::placeholders::_1))); - } } //------------------------------------------------------------------------------ diff --git a/src/xrpld/overlay/detail/PeerImp.h b/src/xrpld/overlay/detail/PeerImp.h index 3d9a0c0b1e..c2221c136d 100644 --- a/src/xrpld/overlay/detail/PeerImp.h +++ b/src/xrpld/overlay/detail/PeerImp.h @@ -40,6 +40,7 @@ #include #include +#include #include #include #include @@ -49,6 +50,68 @@ namespace ripple { struct ValidatorBlobInfo; class SHAMap; +/** + * @class PeerImp + * @brief This class manages established peer-to-peer connections, handles + message exchange, monitors connection health, and graceful shutdown. + * + + * The PeerImp shutdown mechanism is a multi-stage process + * designed to ensure graceful connection termination while handling ongoing + * I/O operations safely. The shutdown can be initiated from multiple points + * and follows a deterministic state machine. + * + * The shutdown process can be triggered from several entry points: + * - **External requests**: `stop()` method called by overlay management + * - **Error conditions**: `fail(error_code)` or `fail(string)` on protocol + * violations + * - **Timer expiration**: Various timeout scenarios (ping timeout, large send + * queue) + * - **Connection health**: Peer tracking divergence or unknown state timeouts + * + * The shutdown follows this progression: + * + * Normal Operation → shutdown() → tryAsyncShutdown() → onShutdown() → close() + * ↓ ↓ ↓ ↓ + * Set shutdown_ SSL graceful Timer cancel Socket close + * Cancel timer shutdown start & cleanup & metrics + * 5s safety timer Set shutdownStarted_ update + * + * Two primary flags coordinate the shutdown process: + * - `shutdown_`: Set when shutdown is requested + * - `shutdownStarted_`: Set when SSL shutdown begins + * + * The shutdown mechanism carefully coordinates with ongoing read/write + * operations: + * + * **Read Operations (`onReadMessage`)**: + * - Checks `shutdown_` flag after processing each message batch + * - If shutdown initiated during processing, calls `tryAsyncShutdown()` + * + * **Write Operations (`onWriteMessage`)**: + * - Checks `shutdown_` flag before queuing new writes + * - Calls `tryAsyncShutdown()` when shutdown flag detected + * + * Multiple timers require coordination during shutdown: + * 1. **Peer Timer**: Regular ping/pong timer cancelled immediately in + * `shutdown()` + * 2. **Shutdown Timer**: 5-second safety timer ensures shutdown completion + * 3. **Operation Cancellation**: All pending async operations are cancelled + * + * The shutdown implements fallback mechanisms: + * - **Graceful Path**: SSL shutdown → Socket close → Cleanup + * - **Forced Path**: If SSL shutdown fails or times out, proceeds to socket + * close + * - **Safety Timer**: 5-second timeout prevents hanging shutdowns + * + * All shutdown operations are serialized through the boost::asio::strand to + * ensure thread safety. The strand guarantees that shutdown state changes + * and I/O operation callbacks are executed sequentially. + * + * @note This class requires careful coordination between async operations, + * timer management, and shutdown procedures to ensure no resource leaks + * or hanging connections in high-throughput networking scenarios. + */ class PeerImp : public Peer, public std::enable_shared_from_this, public OverlayImpl::Child @@ -79,6 +142,8 @@ private: socket_type& socket_; stream_type& stream_; boost::asio::strand strand_; + + // Multi-purpose timer for peer activity monitoring and shutdown safety waitable_timer timer_; // Updated at each stage of the connection process to reflect @@ -95,7 +160,6 @@ private: std::atomic tracking_; clock_type::time_point trackingTime_; - bool detaching_ = false; // Node public key of peer. PublicKey const publicKey_; std::string name_; @@ -175,7 +239,19 @@ private: http_response_type response_; boost::beast::http::fields const& headers_; std::queue> send_queue_; - bool gracefulClose_ = false; + + // Primary shutdown flag set when shutdown is requested + bool shutdown_ = false; + + // SSL shutdown coordination flag + bool shutdownStarted_ = false; + + // Indicates a read operation is currently pending + bool readPending_ = false; + + // Indicates a write operation is currently pending + bool writePending_ = false; + int large_sendq_ = 0; std::unique_ptr load_event_; // The highest sequence of each PublisherList that has @@ -425,9 +501,6 @@ public: bool isHighLatency() const override; - void - fail(std::string const& reason); - bool compressionEnabled() const override { @@ -441,32 +514,129 @@ public: } private: - void - close(); - + /** + * @brief Handles a failure associated with a specific error code. + * + * This function is called when an operation fails with an error code. It + * logs the warning message and gracefully shutdowns the connection. + * + * The function will do nothing if the connection is already closed or if a + * shutdown is already in progress. + * + * @param name The name of the operation that failed (e.g., "read", + * "write"). + * @param ec The error code associated with the failure. + * @note This function must be called from within the object's strand. + */ void fail(std::string const& name, error_code ec); + /** + * @brief Handles a failure described by a reason string. + * + * This overload is used for logical errors or protocol violations not + * associated with a specific error code. It logs a warning with the + * given reason, then initiates a graceful shutdown. + * + * The function will do nothing if the connection is already closed or if a + * shutdown is already in progress. + * + * @param reason A descriptive string explaining the reason for the failure. + * @note This function must be called from within the object's strand. + */ void - gracefulClose(); + fail(std::string const& reason); + /** @brief Initiates the peer disconnection sequence. + * + * This is the primary entry point to start closing a peer connection. It + * marks the peer for shutdown and cancels any outstanding asynchronous + * operations. This cancellation allows the graceful shutdown to proceed + * once the handlers for the cancelled operations have completed. + * + * @note This method must be called on the peer's strand. + */ void - setTimer(); + shutdown(); + /** @brief Attempts to perform a graceful SSL shutdown if conditions are + * met. + * + * This helper function checks if the peer is in a state where a graceful + * SSL shutdown can be performed (i.e., shutdown has been requested and no + * I/O operations are currently in progress). + * + * @note This method must be called on the peer's strand. + */ void - cancelTimer(); + tryAsyncShutdown(); + + /** + * @brief Handles the completion of the asynchronous SSL shutdown. + * + * This function is the callback for the `async_shutdown` operation started + * in `shutdown()`. Its first action is to cancel the timer. It + * then inspects the error code to determine the outcome. + * + * Regardless of the result, this function proceeds to call `close()` to + * ensure the underlying socket is fully closed. + * + * @param ec The error code resulting from the `async_shutdown` operation. + */ + void + onShutdown(error_code ec); + + /** + * @brief Forcibly closes the underlying socket connection. + * + * This function provides the final, non-graceful shutdown of the peer + * connection. It ensures any pending timers are cancelled and then + * immediately closes the TCP socket, bypassing the SSL shutdown handshake. + * + * After closing, it notifies the overlay manager of the disconnection. + * + * @note This function must be called from within the object's strand. + */ + void + close(); + + /** + * @brief Sets and starts the peer timer. + * + * This function starts timer, which is used to detect inactivity + * and prevent stalled connections. It sets the timer to expire after the + * predefined `peerTimerInterval`. + * + * @note This function will terminate the connection in case of any errors. + */ + void + setTimer(std::chrono::seconds interval); + + /** + * @brief Handles the expiration of the peer activity timer. + * + * This callback is invoked when the timer set by `setTimer` expires. It + * watches the peer connection, checking for various timeout and health + * conditions. + * + * @param ec The error code associated with the timer's expiration. + * `operation_aborted` is expected if the timer was cancelled. + */ + void + onTimer(error_code const& ec); + + /** + * @brief Cancels any pending wait on the peer activity timer. + * + * This function is called to stop the timer. It gracefully manages any + * errors that might occur during the cancellation process. + */ + void + cancelTimer() noexcept; static std::string makePrefix(id_t id); - // Called when the timer wait completes - void - onTimer(boost::system::error_code const& ec); - - // Called when SSL shutdown completes - void - onShutdown(error_code ec); - void doAccept();