introduces hybrid connection timeout to outbound connection attempts

This commit is contained in:
Vito
2025-09-10 13:48:25 +02:00
parent 1738bbccc8
commit 90c74454c8
3 changed files with 273 additions and 46 deletions

View File

@@ -47,6 +47,7 @@ ConnectAttempt::ConnectAttempt(
, usage_(usage)
, strand_(boost::asio::make_strand(io_context))
, timer_(io_context)
, stepTimer_(io_context)
, stream_ptr_(std::make_unique<stream_type>(
socket_type(std::forward<boost::asio::io_context&>(io_context)),
*context))
@@ -91,7 +92,7 @@ ConnectAttempt::run()
ioPending_ = true;
// Allow up to connectTimeout_ seconds to establish remote peer connection
setTimer();
setTimer(ConnectionStep::TcpConnect);
stream_.next_layer().async_connect(
remote_endpoint_,
@@ -206,24 +207,70 @@ ConnectAttempt::fail(std::string const& name, error_code ec)
}
void
ConnectAttempt::setTimer()
ConnectAttempt::setTimer(ConnectionStep step)
{
currentStep_ = step;
// Set global timer (only if not already set)
if (timer_.expiry() == std::chrono::steady_clock::time_point{})
{
try
{
timer_.expires_after(connectTimeout);
timer_.async_wait(boost::asio::bind_executor(
strand_,
std::bind(
&ConnectAttempt::onTimer,
shared_from_this(),
std::placeholders::_1)));
}
catch (std::exception const& ex)
{
JLOG(journal_.error()) << "setTimer (global): " << ex.what();
return close();
}
}
// Set step-specific timer
try
{
timer_.expires_after(connectTimeout);
std::chrono::seconds stepTimeout;
switch (step)
{
case ConnectionStep::TcpConnect:
stepTimeout = StepTimeouts::tcpConnect;
break;
case ConnectionStep::TlsHandshake:
stepTimeout = StepTimeouts::tlsHandshake;
break;
case ConnectionStep::HttpWrite:
stepTimeout = StepTimeouts::httpWrite;
break;
case ConnectionStep::HttpRead:
stepTimeout = StepTimeouts::httpRead;
break;
case ConnectionStep::Complete:
return; // No timer needed for complete step
}
// call to expires_after cancels previous timer
stepTimer_.expires_after(stepTimeout);
stepTimer_.async_wait(boost::asio::bind_executor(
strand_,
std::bind(
&ConnectAttempt::onTimer,
shared_from_this(),
std::placeholders::_1)));
JLOG(journal_.trace()) << "setTimer: " << stepToString(step)
<< " timeout=" << stepTimeout.count() << "s";
}
catch (std::exception const& ex)
{
JLOG(journal_.error()) << "setTimer: " << ex.what();
JLOG(journal_.error())
<< "setTimer (step " << stepToString(step) << "): " << ex.what();
return close();
}
timer_.async_wait(boost::asio::bind_executor(
strand_,
std::bind(
&ConnectAttempt::onTimer,
shared_from_this(),
std::placeholders::_1)));
}
void
@@ -232,6 +279,7 @@ ConnectAttempt::cancelTimer()
try
{
timer_.cancel();
stepTimer_.cancel();
}
catch (boost::system::system_error const&)
{
@@ -256,7 +304,26 @@ ConnectAttempt::onTimer(error_code ec)
return close();
}
JLOG(journal_.debug()) << "onTimer: Timeout";
// Determine which timer expired by checking their expiry times
auto const now = std::chrono::steady_clock::now();
bool globalExpired = (timer_.expiry() <= now);
bool stepExpired = (stepTimer_.expiry() <= now);
if (globalExpired)
{
JLOG(journal_.debug())
<< "onTimer: Global timeout; step: " << stepToString(currentStep_);
}
else if (stepExpired)
{
JLOG(journal_.debug())
<< "onTimer: Step timeout; step: " << stepToString(currentStep_);
}
else
{
JLOG(journal_.warn()) << "onTimer: Unexpected timer callback";
}
close();
}
@@ -286,6 +353,8 @@ ConnectAttempt::onConnect(error_code ec)
ioPending_ = true;
setTimer(ConnectionStep::TlsHandshake);
stream_.set_verify_mode(boost::asio::ssl::verify_none);
stream_.async_handshake(
boost::asio::ssl::stream_base::client,
@@ -345,6 +414,8 @@ ConnectAttempt::onHandshake(error_code ec)
ioPending_ = true;
setTimer(ConnectionStep::HttpWrite);
boost::beast::http::async_write(
stream_,
req_,
@@ -374,6 +445,8 @@ ConnectAttempt::onWrite(error_code ec)
ioPending_ = true;
setTimer(ConnectionStep::HttpRead);
boost::beast::http::async_read(
stream_,
read_buf_,
@@ -391,6 +464,7 @@ ConnectAttempt::onRead(error_code ec)
{
cancelTimer();
ioPending_ = false;
currentStep_ = ConnectionStep::Complete;
if (ec)
{

View File

@@ -26,99 +26,248 @@
namespace ripple {
/** Manages an outbound connection attempt. */
/**
* @class ConnectAttempt
* @brief Manages outbound peer connection attempts with comprehensive timeout
* handling
*
* The ConnectAttempt class handles the complete lifecycle of establishing an
* outbound connection to a peer in the XRPL network. It implements a
* sophisticated dual-timer system that provides both global timeout protection
* and per-step timeout diagnostics.
*
* The connection establishment follows these steps:
* 1. **TCP Connect**: Establish basic network connection
* 2. **TLS Handshake**: Negotiate SSL/TLS encryption
* 3. **HTTP Write**: Send peer handshake request
* 4. **HTTP Read**: Receive and validate peer response
* 5. **Complete**: Connection successfully established
*
* Uses a hybrid timeout approach:
* - **Global Timer**: Hard limit (20s) for entire connection process
* - **Step Timers**: Individual timeouts for each connection phase
*
* - All errors result in connection termination
*
* All operations are serialized using boost::asio::strand to ensure thread
* safety. The class is designed to be used exclusively within the ASIO event
* loop.
*
* @note This class should not be used directly. It is managed by OverlayImpl
* as part of the peer discovery and connection management system.
*
*/
class ConnectAttempt : public OverlayImpl::Child,
public std::enable_shared_from_this<ConnectAttempt>
{
private:
using error_code = boost::system::error_code;
using endpoint_type = boost::asio::ip::tcp::endpoint;
using request_type =
boost::beast::http::request<boost::beast::http::empty_body>;
using response_type =
boost::beast::http::response<boost::beast::http::dynamic_body>;
using socket_type = boost::asio::ip::tcp::socket;
using middle_type = boost::beast::tcp_stream;
using stream_type = boost::beast::ssl_stream<middle_type>;
using shared_context = std::shared_ptr<boost::asio::ssl::context>;
static constexpr std::chrono::seconds connectTimeout{15};
/**
* @enum ConnectionStep
* @brief Represents the current phase of the connection establishment
* process
*
* Used for tracking progress and providing detailed timeout diagnostics.
* Each step has its own timeout value defined in StepTimeouts.
*/
enum class ConnectionStep {
TcpConnect, // Establishing TCP connection to remote peer
TlsHandshake, // Performing SSL/TLS handshake
HttpWrite, // Sending HTTP upgrade request
HttpRead, // Reading HTTP upgrade response
Complete // Connection successfully established
};
// Global timeout for entire connection process (hard limit)
static constexpr std::chrono::seconds connectTimeout{20};
/**
* @struct StepTimeouts
* @brief Defines timeout values for each connection step
*
* These timeouts are designed to detect slow individual phases while
* allowing the global timeout to enforce the overall time limit.
*/
struct StepTimeouts
{
static constexpr std::chrono::seconds tcpConnect{
8}; // TCP connection timeout
static constexpr std::chrono::seconds tlsHandshake{
5}; // SSL handshake timeout
static constexpr std::chrono::seconds httpWrite{
2}; // HTTP write timeout
static constexpr std::chrono::seconds httpRead{3}; // HTTP read timeout
};
// Core application and networking components
Application& app_;
std::uint32_t const id_;
Peer::id_t const id_;
beast::WrappedSink sink_;
beast::Journal const journal_;
endpoint_type remote_endpoint_;
Resource::Consumer usage_;
boost::asio::strand<boost::asio::io_context::executor_type> strand_;
boost::asio::basic_waitable_timer<std::chrono::steady_clock> timer_;
std::unique_ptr<stream_type> stream_ptr_;
boost::asio::basic_waitable_timer<std::chrono::steady_clock> stepTimer_;
std::unique_ptr<stream_type> stream_ptr_; // SSL stream (owned)
socket_type& socket_;
stream_type& stream_;
boost::beast::multi_buffer read_buf_;
response_type response_;
std::shared_ptr<PeerFinder::Slot> slot_;
request_type req_;
bool shutdown_ = false;
bool ioPending_ = false;
bool shutdownStarted_ = false;
bool handshakeComplete_ = false;
bool shutdown_ = false; // Shutdown has been initiated
bool ioPending_ = false; // Async I/O operation in progress
bool shutdownStarted_ = false; // SSL shutdown in progress
bool handshakeComplete_ = false; // SSL handshake completed successfully
ConnectionStep currentStep_ =
ConnectionStep::TcpConnect; // Current connection phase
public:
/**
* @brief Construct a new ConnectAttempt object
*
* @param app Application context providing configuration and services
* @param io_context ASIO I/O context for async operations
* @param remote_endpoint Target peer endpoint to connect to
* @param usage Resource usage tracker for rate limiting
* @param context Shared SSL context for encryption
* @param id Unique peer identifier for this connection attempt
* @param slot PeerFinder slot representing this connection
* @param journal Logging interface for diagnostics
* @param overlay Parent overlay manager
*
* @note The constructor only initializes the object. Call run() to begin
* the actual connection attempt.
*/
ConnectAttempt(
Application& app,
boost::asio::io_context& io_context,
endpoint_type const& remote_endpoint,
Resource::Consumer usage,
shared_context const& context,
std::uint32_t id,
Peer::id_t id,
std::shared_ptr<PeerFinder::Slot> const& slot,
beast::Journal journal,
OverlayImpl& overlay);
~ConnectAttempt();
/**
* @brief Stop the connection attempt
*
* This method is thread-safe and can be called from any thread.
*/
void
stop() override;
/**
* @brief Begin the connection attempt
*
* This method is thread-safe and posts to the strand if needed.
*/
void
run();
private:
/**
* @brief Set timers for the specified connection step
*
* @param step The connection step to set timers for
*
* Sets both the step-specific timer and the global timer (if not already
* set).
*/
void
setTimer();
setTimer(ConnectionStep step);
/**
* @brief Cancel both global and step timers
*
* Used during cleanup and when connection completes successfully.
* Exceptions from timer cancellation are safely ignored.
*/
void
cancelTimer();
/**
* @brief Handle timer expiration events
*
* @param ec Error code from timer operation
*
* Determines which timer expired (global vs step) and logs appropriate
* diagnostic information before terminating the connection.
*/
void
onTimer(error_code ec);
void
onConnect(error_code ec);
void
onHandshake(error_code ec);
void
onWrite(error_code ec);
void
onRead(error_code ec);
void
fail(std::string const& reason);
void
fail(std::string const& name, error_code ec);
void
shutdown();
void
tryAsyncShutdown();
void
onShutdown(error_code ec);
void
close();
// Connection phase handlers
void
onConnect(error_code ec); // TCP connection completion handler
void
onHandshake(error_code ec); // TLS handshake completion handler
void
onWrite(error_code ec); // HTTP write completion handler
void
onRead(error_code ec); // HTTP read completion handler
// Error and cleanup handlers
void
fail(std::string const& reason); // Fail with custom reason
void
fail(std::string const& name, error_code ec); // Fail with system error
void
shutdown(); // Initiate graceful shutdown
void
tryAsyncShutdown(); // Attempt async SSL shutdown
void
onShutdown(error_code ec); // SSL shutdown completion handler
void
close(); // Force close socket
/**
* @brief Process the HTTP upgrade response from peer
*
* Validates the peer's response, extracts protocol information,
* verifies handshake, and either creates a PeerImp or handles
* redirect responses.
*/
void
processResponse();
static std::string
stepToString(ConnectionStep step)
{
switch (step)
{
case ConnectAttempt::ConnectionStep::TcpConnect:
return "TcpConnect";
case ConnectAttempt::ConnectionStep::TlsHandshake:
return "TlsHandshake";
case ConnectAttempt::ConnectionStep::HttpWrite:
return "HttpWrite";
case ConnectAttempt::ConnectionStep::HttpRead:
return "HttpRead";
case ConnectAttempt::ConnectionStep::Complete:
return "Complete";
}
return "Unknown";
};
template <class = void>
static boost::asio::ip::tcp::endpoint
parse_endpoint(std::string const& s, boost::system::error_code& ec)

View File

@@ -733,6 +733,10 @@ PeerImp::setTimer(std::chrono::seconds interval)
void
PeerImp::onTimer(error_code const& ec)
{
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::PeerImp::onTimer : strand in this thread");
if (!socket_.is_open())
return;