Compare commits

...

39 Commits

Author SHA1 Message Date
Bronek Kozicki
990d8a28bc Merge branch 'develop' into tapanito/bugfix/graceful-disconect 2025-09-15 21:01:12 +01:00
Vito Tumas
f299db4172 Merge branch 'develop' into tapanito/bugfix/graceful-disconect 2025-09-15 17:54:35 +02:00
Vito
45bb750559 removes state flags, and use the step instead 2025-09-15 16:40:10 +02:00
Vito Tumas
195f6befaf Merge branch 'develop' into tapanito/bugfix/graceful-disconect 2025-09-12 10:14:03 +02:00
Vito
61c66db8c6 documents peerImp shutdown mechanism 2025-09-10 16:30:29 +02:00
Vito
7b593c586b adjust http write timeout 2025-09-10 14:02:34 +02:00
Vito
06460e2238 increases tls handshake timeout 2025-09-10 13:51:49 +02:00
Vito
90c74454c8 introduces hybrid connection timeout to outbound connection attempts 2025-09-10 13:48:37 +02:00
Vito Tumas
1738bbccc8 Merge branch 'develop' into tapanito/bugfix/graceful-disconect 2025-09-05 11:56:34 +02:00
Vito
d68ca81b71 adds missing writePending flag 2025-09-04 15:34:04 +02:00
Vito Tumas
273e8b5365 Merge branch 'develop' into tapanito/bugfix/graceful-disconect 2025-09-04 12:41:59 +02:00
Vito
c487b41d1f reduce log verbosity 2025-09-04 12:40:33 +02:00
Vito
a5bdb3cef5 correctly handle shutdown when tls was not established yet 2025-09-04 12:40:11 +02:00
Vito
80c8a2e969 improves logging of failed connect attempts 2025-09-04 11:47:11 +02:00
Vito
c273c14b3e updates peerImp to correctly cancel protocol start if shutdown was initiated 2025-09-04 11:46:39 +02:00
Vito
3e406e1e13 changes connectAttempt timer to give 15 seconds for the whole connection process 2025-09-04 11:45:58 +02:00
Vito
15ae952514 cleans up connectAttempt 2025-09-04 11:44:47 +02:00
Vito
62e10d09c6 restores shutdownStarted_ 2025-08-27 12:19:44 +02:00
Vito
5984ef1400 Merge branch 'develop' into tapanito/bugfix/graceful-disconect 2025-08-27 12:18:06 +02:00
Vito
a6eb8379f0 simplifies connect attempt async shutdown 2025-08-27 11:44:26 +02:00
Vito
df82fe3311 address review comments 2025-08-26 14:21:27 +02:00
Vito
a34d2139a7 removes unused imports 2025-08-21 11:00:12 +02:00
Vito Tumas
985c199c11 Merge branch 'develop' into tapanito/bugfix/graceful-disconect 2025-08-21 10:56:26 +02:00
Vito
5b27b7e429 adds graceful shutdown to connect attempt 2025-08-21 10:54:17 +02:00
Vito
d9884eefa1 improves PeerImp logging 2025-08-21 10:54:00 +02:00
Vito
28870883bf cleans up assets in PeerImp 2025-08-20 12:17:46 +02:00
Vito
3d07dba2ce W.I.P. correct async shutdown 2025-08-20 11:20:40 +02:00
Vito
11152e39e6 prevent further async operations once shutdown initaited 2025-08-19 15:59:50 +02:00
Vito
7e41aaeffb adds graceful shutdown for outbound connection attempt 2025-08-19 15:59:49 +02:00
Bart
491bc19f5d docs: Updates list of maintainers and reviewers (#5687) 2025-08-19 15:59:43 +02:00
Vito
f0c5af9b99 shutdown the connection after reading EOF 2025-08-15 13:07:17 +02:00
Vito Tumas
7587d5fb05 Merge branch 'develop' into tapanito/bugfix/graceful-disconect 2025-08-14 19:09:20 +02:00
Vito Tumas
c665eb3009 Merge branch 'develop' into tapanito/bugfix/graceful-disconect 2025-08-14 16:20:14 +02:00
Vito
8a238aa52a removes graceful close 2025-08-14 16:07:20 +02:00
Vito
5c6a071c01 log all tcp errors onShutdown 2025-08-14 12:20:45 +02:00
Vito
474514b2b8 improves code style 2025-08-13 17:14:10 +02:00
Vito
72cec6614c updates peer logic to shutdown asynchornously 2025-08-13 16:42:07 +02:00
Vito
068bcd0399 adds a check whether the socket is open before sending a message 2025-08-12 19:16:12 +02:00
Vito
076371746b adds graceful peer disconnection 2025-08-12 19:02:56 +02:00
4 changed files with 966 additions and 335 deletions

View File

@@ -24,6 +24,8 @@
#include <xrpl/json/json_reader.h> #include <xrpl/json/json_reader.h>
#include <sstream>
namespace ripple { namespace ripple {
ConnectAttempt::ConnectAttempt( ConnectAttempt::ConnectAttempt(
@@ -45,6 +47,7 @@ ConnectAttempt::ConnectAttempt(
, usage_(usage) , usage_(usage)
, strand_(boost::asio::make_strand(io_context)) , strand_(boost::asio::make_strand(io_context))
, timer_(io_context) , timer_(io_context)
, stepTimer_(io_context)
, stream_ptr_(std::make_unique<stream_type>( , stream_ptr_(std::make_unique<stream_type>(
socket_type(std::forward<boost::asio::io_context&>(io_context)), socket_type(std::forward<boost::asio::io_context&>(io_context)),
*context)) *context))
@@ -52,14 +55,14 @@ ConnectAttempt::ConnectAttempt(
, stream_(*stream_ptr_) , stream_(*stream_ptr_)
, slot_(slot) , slot_(slot)
{ {
JLOG(journal_.debug()) << "Connect " << remote_endpoint;
} }
ConnectAttempt::~ConnectAttempt() ConnectAttempt::~ConnectAttempt()
{ {
// slot_ will be null if we successfully connected
// and transferred ownership to a PeerImp
if (slot_ != nullptr) if (slot_ != nullptr)
overlay_.peerFinder().on_closed(slot_); overlay_.peerFinder().on_closed(slot_);
JLOG(journal_.trace()) << "~ConnectAttempt";
} }
void void
@@ -68,16 +71,29 @@ ConnectAttempt::stop()
if (!strand_.running_in_this_thread()) if (!strand_.running_in_this_thread())
return boost::asio::post( return boost::asio::post(
strand_, std::bind(&ConnectAttempt::stop, shared_from_this())); strand_, std::bind(&ConnectAttempt::stop, shared_from_this()));
if (socket_.is_open())
{ if (!socket_.is_open())
JLOG(journal_.debug()) << "Stop"; return;
}
close(); JLOG(journal_.debug()) << "stop: Stop";
shutdown();
} }
void void
ConnectAttempt::run() ConnectAttempt::run()
{ {
if (!strand_.running_in_this_thread())
return boost::asio::post(
strand_, std::bind(&ConnectAttempt::run, shared_from_this()));
JLOG(journal_.debug()) << "run: connecting to " << remote_endpoint_;
ioPending_ = true;
// Allow up to connectTimeout_ seconds to establish remote peer connection
setTimer(ConnectionStep::TcpConnect);
stream_.next_layer().async_connect( stream_.next_layer().async_connect(
remote_endpoint_, remote_endpoint_,
boost::asio::bind_executor( boost::asio::bind_executor(
@@ -90,61 +106,177 @@ ConnectAttempt::run()
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
void
ConnectAttempt::shutdown()
{
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::ConnectAttempt::shutdown: strand in this thread");
if (!socket_.is_open())
return;
shutdown_ = true;
boost::beast::get_lowest_layer(stream_).cancel();
tryAsyncShutdown();
}
void
ConnectAttempt::tryAsyncShutdown()
{
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::ConnectAttempt::tryAsyncShutdown : strand in this thread");
if (!shutdown_ || currentStep_ == ConnectionStep::ShutdownStarted)
return;
if (ioPending_)
return;
// gracefully shutdown the SSL socket, performing a shutdown handshake
if (currentStep_ != ConnectionStep::TcpConnect &&
currentStep_ != ConnectionStep::TlsHandshake)
{
setTimer(ConnectionStep::ShutdownStarted);
return stream_.async_shutdown(bind_executor(
strand_,
std::bind(
&ConnectAttempt::onShutdown,
shared_from_this(),
std::placeholders::_1)));
}
close();
}
void
ConnectAttempt::onShutdown(error_code ec)
{
cancelTimer();
if (ec)
{
// - eof: the stream was cleanly closed
// - operation_aborted: an expired timer (slow shutdown)
// - stream_truncated: the tcp connection closed (no handshake) it could
// occur if a peer does not perform a graceful disconnect
// - broken_pipe: the peer is gone
// - application data after close notify: benign SSL shutdown condition
bool shouldLog =
(ec != boost::asio::error::eof &&
ec != boost::asio::error::operation_aborted &&
ec.message().find("application data after close notify") ==
std::string::npos);
if (shouldLog)
{
JLOG(journal_.debug()) << "onShutdown: " << ec.message();
}
}
close();
}
void void
ConnectAttempt::close() ConnectAttempt::close()
{ {
XRPL_ASSERT( XRPL_ASSERT(
strand_.running_in_this_thread(), strand_.running_in_this_thread(),
"ripple::ConnectAttempt::close : strand in this thread"); "ripple::ConnectAttempt::close : strand in this thread");
if (socket_.is_open()) if (!socket_.is_open())
{ return;
try
{
timer_.cancel();
socket_.close();
}
catch (boost::system::system_error const&)
{
// ignored
}
JLOG(journal_.debug()) << "Closed"; cancelTimer();
}
error_code ec;
socket_.close(ec);
} }
void void
ConnectAttempt::fail(std::string const& reason) ConnectAttempt::fail(std::string const& reason)
{ {
JLOG(journal_.debug()) << reason; JLOG(journal_.debug()) << reason;
close(); shutdown();
} }
void void
ConnectAttempt::fail(std::string const& name, error_code ec) ConnectAttempt::fail(std::string const& name, error_code ec)
{ {
JLOG(journal_.debug()) << name << ": " << ec.message(); JLOG(journal_.debug()) << name << ": " << ec.message();
close(); shutdown();
} }
void void
ConnectAttempt::setTimer() ConnectAttempt::setTimer(ConnectionStep step)
{ {
try currentStep_ = step;
// Set global timer (only if not already set)
if (timer_.expiry() == std::chrono::steady_clock::time_point{})
{ {
timer_.expires_after(std::chrono::seconds(15)); try
} {
catch (boost::system::system_error const& e) timer_.expires_after(connectTimeout);
{ timer_.async_wait(boost::asio::bind_executor(
JLOG(journal_.error()) << "setTimer: " << e.code(); strand_,
return; std::bind(
&ConnectAttempt::onTimer,
shared_from_this(),
std::placeholders::_1)));
}
catch (std::exception const& ex)
{
JLOG(journal_.error()) << "setTimer (global): " << ex.what();
return close();
}
} }
timer_.async_wait(boost::asio::bind_executor( // Set step-specific timer
strand_, try
std::bind( {
&ConnectAttempt::onTimer, std::chrono::seconds stepTimeout;
shared_from_this(), switch (step)
std::placeholders::_1))); {
case ConnectionStep::TcpConnect:
stepTimeout = StepTimeouts::tcpConnect;
break;
case ConnectionStep::TlsHandshake:
stepTimeout = StepTimeouts::tlsHandshake;
break;
case ConnectionStep::HttpWrite:
stepTimeout = StepTimeouts::httpWrite;
break;
case ConnectionStep::HttpRead:
stepTimeout = StepTimeouts::httpRead;
break;
case ConnectionStep::ShutdownStarted:
stepTimeout = StepTimeouts::tlsShutdown;
break;
case ConnectionStep::Complete:
case ConnectionStep::Init:
return; // No timer needed for init or complete step
}
// call to expires_after cancels previous timer
stepTimer_.expires_after(stepTimeout);
stepTimer_.async_wait(boost::asio::bind_executor(
strand_,
std::bind(
&ConnectAttempt::onTimer,
shared_from_this(),
std::placeholders::_1)));
JLOG(journal_.trace()) << "setTimer: " << stepToString(step)
<< " timeout=" << stepTimeout.count() << "s";
}
catch (std::exception const& ex)
{
JLOG(journal_.error())
<< "setTimer (step " << stepToString(step) << "): " << ex.what();
return close();
}
} }
void void
@@ -153,6 +285,7 @@ ConnectAttempt::cancelTimer()
try try
{ {
timer_.cancel(); timer_.cancel();
stepTimer_.cancel();
} }
catch (boost::system::system_error const&) catch (boost::system::system_error const&)
{ {
@@ -165,34 +298,69 @@ ConnectAttempt::onTimer(error_code ec)
{ {
if (!socket_.is_open()) if (!socket_.is_open())
return; return;
if (ec == boost::asio::error::operation_aborted)
return;
if (ec) if (ec)
{ {
// do not initiate shutdown, timers are frequently cancelled
if (ec == boost::asio::error::operation_aborted)
return;
// This should never happen // This should never happen
JLOG(journal_.error()) << "onTimer: " << ec.message(); JLOG(journal_.error()) << "onTimer: " << ec.message();
return close(); return close();
} }
fail("Timeout");
// Determine which timer expired by checking their expiry times
auto const now = std::chrono::steady_clock::now();
bool globalExpired = (timer_.expiry() <= now);
bool stepExpired = (stepTimer_.expiry() <= now);
if (globalExpired)
{
JLOG(journal_.debug())
<< "onTimer: Global timeout; step: " << stepToString(currentStep_);
}
else if (stepExpired)
{
JLOG(journal_.debug())
<< "onTimer: Step timeout; step: " << stepToString(currentStep_);
}
else
{
JLOG(journal_.warn()) << "onTimer: Unexpected timer callback";
}
close();
} }
void void
ConnectAttempt::onConnect(error_code ec) ConnectAttempt::onConnect(error_code ec)
{ {
cancelTimer(); ioPending_ = false;
if (ec == boost::asio::error::operation_aborted)
return;
endpoint_type local_endpoint;
if (!ec)
local_endpoint = socket_.local_endpoint(ec);
if (ec) if (ec)
{
if (ec == boost::asio::error::operation_aborted)
return tryAsyncShutdown();
return fail("onConnect", ec); return fail("onConnect", ec);
}
if (!socket_.is_open()) if (!socket_.is_open())
return; return;
JLOG(journal_.trace()) << "onConnect";
setTimer(); // check if connection has really been established
socket_.local_endpoint(ec);
if (ec)
return fail("onConnect", ec);
if (shutdown_)
return tryAsyncShutdown();
ioPending_ = true;
setTimer(ConnectionStep::TlsHandshake);
stream_.set_verify_mode(boost::asio::ssl::verify_none); stream_.set_verify_mode(boost::asio::ssl::verify_none);
stream_.async_handshake( stream_.async_handshake(
boost::asio::ssl::stream_base::client, boost::asio::ssl::stream_base::client,
@@ -207,25 +375,30 @@ ConnectAttempt::onConnect(error_code ec)
void void
ConnectAttempt::onHandshake(error_code ec) ConnectAttempt::onHandshake(error_code ec)
{ {
cancelTimer(); ioPending_ = false;
if (!socket_.is_open())
return; if (ec)
if (ec == boost::asio::error::operation_aborted) {
return; if (ec == boost::asio::error::operation_aborted)
endpoint_type local_endpoint; return tryAsyncShutdown();
if (!ec)
local_endpoint = socket_.local_endpoint(ec); return fail("onHandshake", ec);
}
auto const local_endpoint = socket_.local_endpoint(ec);
if (ec) if (ec)
return fail("onHandshake", ec); return fail("onHandshake", ec);
JLOG(journal_.trace()) << "onHandshake";
setTimer(ConnectionStep::HttpWrite);
// check if we connected to ourselves
if (!overlay_.peerFinder().onConnected( if (!overlay_.peerFinder().onConnected(
slot_, beast::IPAddressConversion::from_asio(local_endpoint))) slot_, beast::IPAddressConversion::from_asio(local_endpoint)))
return fail("Duplicate connection"); return fail("Self connection");
auto const sharedValue = makeSharedValue(*stream_ptr_, journal_); auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
if (!sharedValue) if (!sharedValue)
return close(); // makeSharedValue logs return shutdown(); // makeSharedValue logs
req_ = makeRequest( req_ = makeRequest(
!overlay_.peerFinder().config().peerPrivate, !overlay_.peerFinder().config().peerPrivate,
@@ -242,7 +415,11 @@ ConnectAttempt::onHandshake(error_code ec)
remote_endpoint_.address(), remote_endpoint_.address(),
app_); app_);
setTimer(); if (shutdown_)
return tryAsyncShutdown();
ioPending_ = true;
boost::beast::http::async_write( boost::beast::http::async_write(
stream_, stream_,
req_, req_,
@@ -257,13 +434,23 @@ ConnectAttempt::onHandshake(error_code ec)
void void
ConnectAttempt::onWrite(error_code ec) ConnectAttempt::onWrite(error_code ec)
{ {
cancelTimer(); ioPending_ = false;
if (!socket_.is_open())
return;
if (ec == boost::asio::error::operation_aborted)
return;
if (ec) if (ec)
{
if (ec == boost::asio::error::operation_aborted)
return tryAsyncShutdown();
return fail("onWrite", ec); return fail("onWrite", ec);
}
if (shutdown_)
return tryAsyncShutdown();
ioPending_ = true;
setTimer(ConnectionStep::HttpRead);
boost::beast::http::async_read( boost::beast::http::async_read(
stream_, stream_,
read_buf_, read_buf_,
@@ -280,39 +467,27 @@ void
ConnectAttempt::onRead(error_code ec) ConnectAttempt::onRead(error_code ec)
{ {
cancelTimer(); cancelTimer();
ioPending_ = false;
currentStep_ = ConnectionStep::Complete;
if (!socket_.is_open())
return;
if (ec == boost::asio::error::operation_aborted)
return;
if (ec == boost::asio::error::eof)
{
JLOG(journal_.info()) << "EOF";
setTimer();
return stream_.async_shutdown(boost::asio::bind_executor(
strand_,
std::bind(
&ConnectAttempt::onShutdown,
shared_from_this(),
std::placeholders::_1)));
}
if (ec) if (ec)
return fail("onRead", ec);
processResponse();
}
void
ConnectAttempt::onShutdown(error_code ec)
{
cancelTimer();
if (!ec)
{ {
JLOG(journal_.error()) << "onShutdown: expected error condition"; if (ec == boost::asio::error::eof)
return close(); {
JLOG(journal_.debug()) << "EOF";
return shutdown();
}
if (ec == boost::asio::error::operation_aborted)
return tryAsyncShutdown();
return fail("onRead", ec);
} }
if (ec != boost::asio::error::eof)
return fail("onShutdown", ec); if (shutdown_)
close(); return tryAsyncShutdown();
processResponse();
} }
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
@@ -320,48 +495,69 @@ ConnectAttempt::onShutdown(error_code ec)
void void
ConnectAttempt::processResponse() ConnectAttempt::processResponse()
{ {
if (response_.result() == boost::beast::http::status::service_unavailable)
{
Json::Value json;
Json::Reader r;
std::string s;
s.reserve(boost::asio::buffer_size(response_.body().data()));
for (auto const buffer : response_.body().data())
s.append(
static_cast<char const*>(buffer.data()),
boost::asio::buffer_size(buffer));
auto const success = r.parse(s, json);
if (success)
{
if (json.isObject() && json.isMember("peer-ips"))
{
Json::Value const& ips = json["peer-ips"];
if (ips.isArray())
{
std::vector<boost::asio::ip::tcp::endpoint> eps;
eps.reserve(ips.size());
for (auto const& v : ips)
{
if (v.isString())
{
error_code ec;
auto const ep = parse_endpoint(v.asString(), ec);
if (!ec)
eps.push_back(ep);
}
}
overlay_.peerFinder().onRedirects(remote_endpoint_, eps);
}
}
}
}
if (!OverlayImpl::isPeerUpgrade(response_)) if (!OverlayImpl::isPeerUpgrade(response_))
{ {
JLOG(journal_.info()) // A peer may respond with service_unavailable and a list of alternative
<< "Unable to upgrade to peer protocol: " << response_.result() // peers to connect to, a differing status code is unexpected
<< " (" << response_.reason() << ")"; if (response_.result() !=
return close(); boost::beast::http::status::service_unavailable)
{
JLOG(journal_.warn())
<< "Unable to upgrade to peer protocol: " << response_.result()
<< " (" << response_.reason() << ")";
return shutdown();
}
// Parse response body to determine if this is a redirect or other
// service unavailable
std::string responseBody;
responseBody.reserve(boost::asio::buffer_size(response_.body().data()));
for (auto const buffer : response_.body().data())
responseBody.append(
static_cast<char const*>(buffer.data()),
boost::asio::buffer_size(buffer));
Json::Value json;
Json::Reader reader;
auto const isValidJson = reader.parse(responseBody, json);
// Check if this is a redirect response (contains peer-ips field)
auto const isRedirect =
isValidJson && json.isObject() && json.isMember("peer-ips");
if (!isRedirect)
{
JLOG(journal_.warn())
<< "processResponse: " << remote_endpoint_
<< " failed to upgrade to peer protocol: " << response_.result()
<< " (" << response_.reason() << ")";
return shutdown();
}
Json::Value const& peerIps = json["peer-ips"];
if (!peerIps.isArray())
return fail("processResponse: invalid peer-ips format");
// Extract and validate peer endpoints
std::vector<boost::asio::ip::tcp::endpoint> redirectEndpoints;
redirectEndpoints.reserve(peerIps.size());
for (auto const& ipValue : peerIps)
{
if (!ipValue.isString())
continue;
error_code ec;
auto const endpoint = parse_endpoint(ipValue.asString(), ec);
if (!ec)
redirectEndpoints.push_back(endpoint);
}
// Notify PeerFinder about the redirect redirectEndpoints may be empty
overlay_.peerFinder().onRedirects(remote_endpoint_, redirectEndpoints);
return fail("processResponse: failed to connect to peer: redirected");
} }
// Just because our peer selected a particular protocol version doesn't // Just because our peer selected a particular protocol version doesn't
@@ -381,11 +577,11 @@ ConnectAttempt::processResponse()
auto const sharedValue = makeSharedValue(*stream_ptr_, journal_); auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
if (!sharedValue) if (!sharedValue)
return close(); // makeSharedValue logs return shutdown(); // makeSharedValue logs
try try
{ {
auto publicKey = verifyHandshake( auto const publicKey = verifyHandshake(
response_, response_,
*sharedValue, *sharedValue,
overlay_.setup().networkID, overlay_.setup().networkID,
@@ -393,11 +589,10 @@ ConnectAttempt::processResponse()
remote_endpoint_.address(), remote_endpoint_.address(),
app_); app_);
JLOG(journal_.info())
<< "Public Key: " << toBase58(TokenType::NodePublic, publicKey);
JLOG(journal_.debug()) JLOG(journal_.debug())
<< "Protocol: " << to_string(*negotiatedProtocol); << "Protocol: " << to_string(*negotiatedProtocol);
JLOG(journal_.info())
<< "Public Key: " << toBase58(TokenType::NodePublic, publicKey);
auto const member = app_.cluster().member(publicKey); auto const member = app_.cluster().member(publicKey);
if (member) if (member)
@@ -405,10 +600,21 @@ ConnectAttempt::processResponse()
JLOG(journal_.info()) << "Cluster name: " << *member; JLOG(journal_.info()) << "Cluster name: " << *member;
} }
auto const result = overlay_.peerFinder().activate( auto const result =
slot_, publicKey, static_cast<bool>(member)); overlay_.peerFinder().activate(slot_, publicKey, !member->empty());
if (result != PeerFinder::Result::success) if (result != PeerFinder::Result::success)
return fail("Outbound " + std::string(to_string(result))); {
std::stringstream ss;
ss << "Outbound Connect Attempt " << remote_endpoint_ << " "
<< to_string(result);
return fail(ss.str());
}
if (!socket_.is_open())
return;
if (shutdown_)
return tryAsyncShutdown();
auto const peer = std::make_shared<PeerImp>( auto const peer = std::make_shared<PeerImp>(
app_, app_,

View File

@@ -22,90 +22,258 @@
#include <xrpld/overlay/detail/OverlayImpl.h> #include <xrpld/overlay/detail/OverlayImpl.h>
#include <chrono>
namespace ripple { namespace ripple {
/** Manages an outbound connection attempt. */ /**
* @class ConnectAttempt
* @brief Manages outbound peer connection attempts with comprehensive timeout
* handling
*
* The ConnectAttempt class handles the complete lifecycle of establishing an
* outbound connection to a peer in the XRPL network. It implements a
* sophisticated dual-timer system that provides both global timeout protection
* and per-step timeout diagnostics.
*
* The connection establishment follows these steps:
* 1. **TCP Connect**: Establish basic network connection
* 2. **TLS Handshake**: Negotiate SSL/TLS encryption
* 3. **HTTP Write**: Send peer handshake request
* 4. **HTTP Read**: Receive and validate peer response
* 5. **Complete**: Connection successfully established
*
* Uses a hybrid timeout approach:
* - **Global Timer**: Hard limit (20s) for entire connection process
* - **Step Timers**: Individual timeouts for each connection phase
*
* - All errors result in connection termination
*
* All operations are serialized using boost::asio::strand to ensure thread
* safety. The class is designed to be used exclusively within the ASIO event
* loop.
*
* @note This class should not be used directly. It is managed by OverlayImpl
* as part of the peer discovery and connection management system.
*
*/
class ConnectAttempt : public OverlayImpl::Child, class ConnectAttempt : public OverlayImpl::Child,
public std::enable_shared_from_this<ConnectAttempt> public std::enable_shared_from_this<ConnectAttempt>
{ {
private: private:
using error_code = boost::system::error_code; using error_code = boost::system::error_code;
using endpoint_type = boost::asio::ip::tcp::endpoint; using endpoint_type = boost::asio::ip::tcp::endpoint;
using request_type = using request_type =
boost::beast::http::request<boost::beast::http::empty_body>; boost::beast::http::request<boost::beast::http::empty_body>;
using response_type = using response_type =
boost::beast::http::response<boost::beast::http::dynamic_body>; boost::beast::http::response<boost::beast::http::dynamic_body>;
using socket_type = boost::asio::ip::tcp::socket; using socket_type = boost::asio::ip::tcp::socket;
using middle_type = boost::beast::tcp_stream; using middle_type = boost::beast::tcp_stream;
using stream_type = boost::beast::ssl_stream<middle_type>; using stream_type = boost::beast::ssl_stream<middle_type>;
using shared_context = std::shared_ptr<boost::asio::ssl::context>; using shared_context = std::shared_ptr<boost::asio::ssl::context>;
/**
* @enum ConnectionStep
* @brief Represents the current phase of the connection establishment
* process
*
* Used for tracking progress and providing detailed timeout diagnostics.
* Each step has its own timeout value defined in StepTimeouts.
*/
enum class ConnectionStep {
Init, // Initial state, nothing started
TcpConnect, // Establishing TCP connection to remote peer
TlsHandshake, // Performing SSL/TLS handshake
HttpWrite, // Sending HTTP upgrade request
HttpRead, // Reading HTTP upgrade response
Complete, // Connection successfully established
ShutdownStarted // Connection shutdown has started
};
// A timeout for connection process, greater than all step timeouts
static constexpr std::chrono::seconds connectTimeout{25};
/**
* @struct StepTimeouts
* @brief Defines timeout values for each connection step
*
* These timeouts are designed to detect slow individual phases while
* allowing the global timeout to enforce the overall time limit.
*/
struct StepTimeouts
{
// TCP connection timeout
static constexpr std::chrono::seconds tcpConnect{8};
// SSL handshake timeout
static constexpr std::chrono::seconds tlsHandshake{8};
// HTTP write timeout
static constexpr std::chrono::seconds httpWrite{3};
// HTTP read timeout
static constexpr std::chrono::seconds httpRead{3};
// SSL shutdown timeout
static constexpr std::chrono::seconds tlsShutdown{2};
};
// Core application and networking components
Application& app_; Application& app_;
std::uint32_t const id_; Peer::id_t const id_;
beast::WrappedSink sink_; beast::WrappedSink sink_;
beast::Journal const journal_; beast::Journal const journal_;
endpoint_type remote_endpoint_; endpoint_type remote_endpoint_;
Resource::Consumer usage_; Resource::Consumer usage_;
boost::asio::strand<boost::asio::io_context::executor_type> strand_; boost::asio::strand<boost::asio::io_context::executor_type> strand_;
boost::asio::basic_waitable_timer<std::chrono::steady_clock> timer_; boost::asio::basic_waitable_timer<std::chrono::steady_clock> timer_;
std::unique_ptr<stream_type> stream_ptr_; boost::asio::basic_waitable_timer<std::chrono::steady_clock> stepTimer_;
std::unique_ptr<stream_type> stream_ptr_; // SSL stream (owned)
socket_type& socket_; socket_type& socket_;
stream_type& stream_; stream_type& stream_;
boost::beast::multi_buffer read_buf_; boost::beast::multi_buffer read_buf_;
response_type response_; response_type response_;
std::shared_ptr<PeerFinder::Slot> slot_; std::shared_ptr<PeerFinder::Slot> slot_;
request_type req_; request_type req_;
bool shutdown_ = false; // Shutdown has been initiated
bool ioPending_ = false; // Async I/O operation in progress
ConnectionStep currentStep_ = ConnectionStep::Init;
public: public:
/**
* @brief Construct a new ConnectAttempt object
*
* @param app Application context providing configuration and services
* @param io_context ASIO I/O context for async operations
* @param remote_endpoint Target peer endpoint to connect to
* @param usage Resource usage tracker for rate limiting
* @param context Shared SSL context for encryption
* @param id Unique peer identifier for this connection attempt
* @param slot PeerFinder slot representing this connection
* @param journal Logging interface for diagnostics
* @param overlay Parent overlay manager
*
* @note The constructor only initializes the object. Call run() to begin
* the actual connection attempt.
*/
ConnectAttempt( ConnectAttempt(
Application& app, Application& app,
boost::asio::io_context& io_context, boost::asio::io_context& io_context,
endpoint_type const& remote_endpoint, endpoint_type const& remote_endpoint,
Resource::Consumer usage, Resource::Consumer usage,
shared_context const& context, shared_context const& context,
std::uint32_t id, Peer::id_t id,
std::shared_ptr<PeerFinder::Slot> const& slot, std::shared_ptr<PeerFinder::Slot> const& slot,
beast::Journal journal, beast::Journal journal,
OverlayImpl& overlay); OverlayImpl& overlay);
~ConnectAttempt(); ~ConnectAttempt();
/**
* @brief Stop the connection attempt
*
* This method is thread-safe and can be called from any thread.
*/
void void
stop() override; stop() override;
/**
* @brief Begin the connection attempt
*
* This method is thread-safe and posts to the strand if needed.
*/
void void
run(); run();
private: private:
/**
* @brief Set timers for the specified connection step
*
* @param step The connection step to set timers for
*
* Sets both the step-specific timer and the global timer (if not already
* set).
*/
void void
close(); setTimer(ConnectionStep step);
void
fail(std::string const& reason); /**
void * @brief Cancel both global and step timers
fail(std::string const& name, error_code ec); *
void * Used during cleanup and when connection completes successfully.
setTimer(); * Exceptions from timer cancellation are safely ignored.
*/
void void
cancelTimer(); cancelTimer();
/**
* @brief Handle timer expiration events
*
* @param ec Error code from timer operation
*
* Determines which timer expired (global vs step) and logs appropriate
* diagnostic information before terminating the connection.
*/
void void
onTimer(error_code ec); onTimer(error_code ec);
// Connection phase handlers
void void
onConnect(error_code ec); onConnect(error_code ec); // TCP connection completion handler
void void
onHandshake(error_code ec); onHandshake(error_code ec); // TLS handshake completion handler
void void
onWrite(error_code ec); onWrite(error_code ec); // HTTP write completion handler
void void
onRead(error_code ec); onRead(error_code ec); // HTTP read completion handler
// Error and cleanup handlers
void void
onShutdown(error_code ec); fail(std::string const& reason); // Fail with custom reason
void
fail(std::string const& name, error_code ec); // Fail with system error
void
shutdown(); // Initiate graceful shutdown
void
tryAsyncShutdown(); // Attempt async SSL shutdown
void
onShutdown(error_code ec); // SSL shutdown completion handler
void
close(); // Force close socket
/**
* @brief Process the HTTP upgrade response from peer
*
* Validates the peer's response, extracts protocol information,
* verifies handshake, and either creates a PeerImp or handles
* redirect responses.
*/
void void
processResponse(); processResponse();
static std::string
stepToString(ConnectionStep step)
{
switch (step)
{
case ConnectionStep::Init:
return "Init";
case ConnectionStep::TcpConnect:
return "TcpConnect";
case ConnectionStep::TlsHandshake:
return "TlsHandshake";
case ConnectionStep::HttpWrite:
return "HttpWrite";
case ConnectionStep::HttpRead:
return "HttpRead";
case ConnectionStep::Complete:
return "Complete";
case ConnectionStep::ShutdownStarted:
return "ShutdownStarted";
}
return "Unknown";
};
template <class = void> template <class = void>
static boost::asio::ip::tcp::endpoint static boost::asio::ip::tcp::endpoint
parse_endpoint(std::string const& s, boost::system::error_code& ec) parse_endpoint(std::string const& s, boost::system::error_code& ec)

View File

@@ -44,6 +44,7 @@
#include <boost/beast/core/ostream.hpp> #include <boost/beast/core/ostream.hpp>
#include <algorithm> #include <algorithm>
#include <chrono>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <numeric> #include <numeric>
@@ -59,6 +60,10 @@ std::chrono::milliseconds constexpr peerHighLatency{300};
/** How often we PING the peer to check for latency and sendq probe */ /** How often we PING the peer to check for latency and sendq probe */
std::chrono::seconds constexpr peerTimerInterval{60}; std::chrono::seconds constexpr peerTimerInterval{60};
/** The timeout for a shutdown timer */
std::chrono::seconds constexpr shutdownTimerInterval{5};
} // namespace } // namespace
// TODO: Remove this exclusion once unit tests are added after the hotfix // TODO: Remove this exclusion once unit tests are added after the hotfix
@@ -215,23 +220,17 @@ PeerImp::stop()
{ {
if (!strand_.running_in_this_thread()) if (!strand_.running_in_this_thread())
return post(strand_, std::bind(&PeerImp::stop, shared_from_this())); return post(strand_, std::bind(&PeerImp::stop, shared_from_this()));
if (socket_.is_open())
{ if (!socket_.is_open())
// The rationale for using different severity levels is that return;
// outbound connections are under our control and may be logged
// at a higher level, but inbound connections are more numerous and // The rationale for using different severity levels is that
// uncontrolled so to prevent log flooding the severity is reduced. // outbound connections are under our control and may be logged
// // at a higher level, but inbound connections are more numerous and
if (inbound_) // uncontrolled so to prevent log flooding the severity is reduced.
{ JLOG(journal_.debug()) << "stop: Stop";
JLOG(journal_.debug()) << "Stop";
} shutdown();
else
{
JLOG(journal_.info()) << "Stop";
}
}
close();
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@@ -241,11 +240,14 @@ PeerImp::send(std::shared_ptr<Message> const& m)
{ {
if (!strand_.running_in_this_thread()) if (!strand_.running_in_this_thread())
return post(strand_, std::bind(&PeerImp::send, shared_from_this(), m)); return post(strand_, std::bind(&PeerImp::send, shared_from_this(), m));
if (gracefulClose_)
return; if (!socket_.is_open())
if (detaching_)
return; return;
// we are in progress of closing the connection
if (shutdown_)
return tryAsyncShutdown();
auto validator = m->getValidatorKey(); auto validator = m->getValidatorKey();
if (validator && !squelch_.expireSquelch(*validator)) if (validator && !squelch_.expireSquelch(*validator))
{ {
@@ -287,6 +289,7 @@ PeerImp::send(std::shared_ptr<Message> const& m)
if (sendq_size != 0) if (sendq_size != 0)
return; return;
writePending_ = true;
boost::asio::async_write( boost::asio::async_write(
stream_, stream_,
boost::asio::buffer( boost::asio::buffer(
@@ -573,34 +576,21 @@ PeerImp::hasRange(std::uint32_t uMin, std::uint32_t uMax)
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
void void
PeerImp::close() PeerImp::fail(std::string const& name, error_code ec)
{ {
XRPL_ASSERT( XRPL_ASSERT(
strand_.running_in_this_thread(), strand_.running_in_this_thread(),
"ripple::PeerImp::close : strand in this thread"); "ripple::PeerImp::fail : strand in this thread");
if (socket_.is_open())
{
detaching_ = true; // DEPRECATED
try
{
timer_.cancel();
socket_.close();
}
catch (boost::system::system_error const&)
{
// ignored
}
overlay_.incPeerDisconnect(); if (!socket_.is_open())
if (inbound_) return;
{
JLOG(journal_.debug()) << "Closed"; JLOG(journal_.warn()) << name << " from "
} << toBase58(TokenType::NodePublic, publicKey_)
else << " at " << remote_address_.to_string() << ": "
{ << ec.message();
JLOG(journal_.info()) << "Closed";
} shutdown();
}
} }
void void
@@ -613,45 +603,39 @@ PeerImp::fail(std::string const& reason)
(void(Peer::*)(std::string const&)) & PeerImp::fail, (void(Peer::*)(std::string const&)) & PeerImp::fail,
shared_from_this(), shared_from_this(),
reason)); reason));
if (journal_.active(beast::severities::kWarning) && socket_.is_open())
if (!socket_.is_open())
return;
// Call to name() locks, log only if the message will be outputed
if (journal_.active(beast::severities::kWarning))
{ {
std::string const n = name(); std::string const n = name();
JLOG(journal_.warn()) << (n.empty() ? remote_address_.to_string() : n) JLOG(journal_.warn()) << (n.empty() ? remote_address_.to_string() : n)
<< " failed: " << reason; << " failed: " << reason;
} }
close();
shutdown();
} }
void void
PeerImp::fail(std::string const& name, error_code ec) PeerImp::tryAsyncShutdown()
{ {
XRPL_ASSERT( XRPL_ASSERT(
strand_.running_in_this_thread(), strand_.running_in_this_thread(),
"ripple::PeerImp::fail : strand in this thread"); "ripple::PeerImp::tryAsyncShutdown : strand in this thread");
if (socket_.is_open())
{
JLOG(journal_.warn())
<< name << " from " << toBase58(TokenType::NodePublic, publicKey_)
<< " at " << remote_address_.to_string() << ": " << ec.message();
}
close();
}
void if (!shutdown_ || shutdownStarted_)
PeerImp::gracefulClose()
{
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::PeerImp::gracefulClose : strand in this thread");
XRPL_ASSERT(
socket_.is_open(), "ripple::PeerImp::gracefulClose : socket is open");
XRPL_ASSERT(
!gracefulClose_,
"ripple::PeerImp::gracefulClose : socket is not closing");
gracefulClose_ = true;
if (send_queue_.size() > 0)
return; return;
setTimer();
if (readPending_ || writePending_)
return;
shutdownStarted_ = true;
setTimer(shutdownTimerInterval);
// gracefully shutdown the SSL socket, performing a shutdown handshake
stream_.async_shutdown(bind_executor( stream_.async_shutdown(bind_executor(
strand_, strand_,
std::bind( std::bind(
@@ -659,69 +643,125 @@ PeerImp::gracefulClose()
} }
void void
PeerImp::setTimer() PeerImp::shutdown()
{
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::PeerImp::shutdown: strand in this thread");
if (!socket_.is_open() || shutdown_)
return;
shutdown_ = true;
boost::beast::get_lowest_layer(stream_).cancel();
tryAsyncShutdown();
}
void
PeerImp::onShutdown(error_code ec)
{
cancelTimer();
if (ec)
{
// - eof: the stream was cleanly closed
// - operation_aborted: an expired timer (slow shutdown)
// - stream_truncated: the tcp connection closed (no handshake) it could
// occur if a peer does not perform a graceful disconnect
// - broken_pipe: the peer is gone
bool shouldLog =
(ec != boost::asio::error::eof &&
ec != boost::asio::error::operation_aborted &&
ec.message().find("application data after close notify") ==
std::string::npos);
if (shouldLog)
{
JLOG(journal_.debug()) << "onShutdown: " << ec.message();
}
}
close();
}
void
PeerImp::close()
{
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::PeerImp::close : strand in this thread");
if (!socket_.is_open())
return;
cancelTimer();
error_code ec;
socket_.close(ec);
overlay_.incPeerDisconnect();
// The rationale for using different severity levels is that
// outbound connections are under our control and may be logged
// at a higher level, but inbound connections are more numerous and
// uncontrolled so to prevent log flooding the severity is reduced.
JLOG((inbound_ ? journal_.debug() : journal_.info())) << "close: Closed";
}
//------------------------------------------------------------------------------
void
PeerImp::setTimer(std::chrono::seconds interval)
{ {
try try
{ {
timer_.expires_after(peerTimerInterval); timer_.expires_after(interval);
} }
catch (boost::system::system_error const& e) catch (std::exception const& ex)
{ {
JLOG(journal_.error()) << "setTimer: " << e.code(); JLOG(journal_.error()) << "setTimer: " << ex.what();
return; return shutdown();
} }
timer_.async_wait(bind_executor( timer_.async_wait(bind_executor(
strand_, strand_,
std::bind( std::bind(
&PeerImp::onTimer, shared_from_this(), std::placeholders::_1))); &PeerImp::onTimer, shared_from_this(), std::placeholders::_1)));
} }
// convenience for ignoring the error code
void
PeerImp::cancelTimer()
{
try
{
timer_.cancel();
}
catch (boost::system::system_error const&)
{
// ignored
}
}
//------------------------------------------------------------------------------
std::string
PeerImp::makePrefix(id_t id)
{
std::stringstream ss;
ss << "[" << std::setfill('0') << std::setw(3) << id << "] ";
return ss.str();
}
void void
PeerImp::onTimer(error_code const& ec) PeerImp::onTimer(error_code const& ec)
{ {
if (!socket_.is_open()) XRPL_ASSERT(
return; strand_.running_in_this_thread(),
"ripple::PeerImp::onTimer : strand in this thread");
if (ec == boost::asio::error::operation_aborted) if (!socket_.is_open())
return; return;
if (ec) if (ec)
{ {
// do not initiate shutdown, timers are frequently cancelled
if (ec == boost::asio::error::operation_aborted)
return;
// This should never happen // This should never happen
JLOG(journal_.error()) << "onTimer: " << ec.message(); JLOG(journal_.error()) << "onTimer: " << ec.message();
return close(); return close();
} }
if (large_sendq_++ >= Tuning::sendqIntervals) // the timer expired before the shutdown completed
// force close the connection
if (shutdown_)
{ {
fail("Large send queue"); JLOG(journal_.debug()) << "onTimer: shutdown timer expired";
return; return close();
} }
if (large_sendq_++ >= Tuning::sendqIntervals)
return fail("Large send queue");
if (auto const t = tracking_.load(); !inbound_ && t != Tracking::converged) if (auto const t = tracking_.load(); !inbound_ && t != Tracking::converged)
{ {
clock_type::duration duration; clock_type::duration duration;
@@ -737,17 +777,13 @@ PeerImp::onTimer(error_code const& ec)
(duration > app_.config().MAX_UNKNOWN_TIME))) (duration > app_.config().MAX_UNKNOWN_TIME)))
{ {
overlay_.peerFinder().on_failure(slot_); overlay_.peerFinder().on_failure(slot_);
fail("Not useful"); return fail("Not useful");
return;
} }
} }
// Already waiting for PONG // Already waiting for PONG
if (lastPingSeq_) if (lastPingSeq_)
{ return fail("Ping Timeout");
fail("Ping Timeout");
return;
}
lastPingTime_ = clock_type::now(); lastPingTime_ = clock_type::now();
lastPingSeq_ = rand_int<std::uint32_t>(); lastPingSeq_ = rand_int<std::uint32_t>();
@@ -758,22 +794,28 @@ PeerImp::onTimer(error_code const& ec)
send(std::make_shared<Message>(message, protocol::mtPING)); send(std::make_shared<Message>(message, protocol::mtPING));
setTimer(); setTimer(peerTimerInterval);
} }
void void
PeerImp::onShutdown(error_code ec) PeerImp::cancelTimer() noexcept
{ {
cancelTimer(); try
// If we don't get eof then something went wrong
if (!ec)
{ {
JLOG(journal_.error()) << "onShutdown: expected error condition"; timer_.cancel();
return close();
} }
if (ec != boost::asio::error::eof) catch (std::exception const& ex)
return fail("onShutdown", ec); {
close(); JLOG(journal_.error()) << "cancelTimer: " << ex.what();
}
}
std::string
PeerImp::makePrefix(id_t id)
{
std::stringstream ss;
ss << "[" << std::setfill('0') << std::setw(3) << id << "] ";
return ss.str();
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@@ -786,6 +828,10 @@ PeerImp::doAccept()
JLOG(journal_.debug()) << "doAccept: " << remote_address_; JLOG(journal_.debug()) << "doAccept: " << remote_address_;
// a shutdown was initiated before the handshake, there is nothing to do
if (shutdown_)
return tryAsyncShutdown();
auto const sharedValue = makeSharedValue(*stream_ptr_, journal_); auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
// This shouldn't fail since we already computed // This shouldn't fail since we already computed
@@ -793,7 +839,7 @@ PeerImp::doAccept()
if (!sharedValue) if (!sharedValue)
return fail("makeSharedValue: Unexpected failure"); return fail("makeSharedValue: Unexpected failure");
JLOG(journal_.info()) << "Protocol: " << to_string(protocol_); JLOG(journal_.debug()) << "Protocol: " << to_string(protocol_);
JLOG(journal_.info()) << "Public Key: " JLOG(journal_.info()) << "Public Key: "
<< toBase58(TokenType::NodePublic, publicKey_); << toBase58(TokenType::NodePublic, publicKey_);
@@ -836,7 +882,7 @@ PeerImp::doAccept()
if (!socket_.is_open()) if (!socket_.is_open())
return; return;
if (ec == boost::asio::error::operation_aborted) if (ec == boost::asio::error::operation_aborted)
return; return tryAsyncShutdown();
if (ec) if (ec)
return fail("onWriteResponse", ec); return fail("onWriteResponse", ec);
if (write_buffer->size() == bytes_transferred) if (write_buffer->size() == bytes_transferred)
@@ -865,6 +911,10 @@ PeerImp::domain() const
void void
PeerImp::doProtocolStart() PeerImp::doProtocolStart()
{ {
// a shutdown was initiated before the handshare, there is nothing to do
if (shutdown_)
return tryAsyncShutdown();
onReadMessage(error_code(), 0); onReadMessage(error_code(), 0);
// Send all the validator lists that have been loaded // Send all the validator lists that have been loaded
@@ -896,30 +946,45 @@ PeerImp::doProtocolStart()
if (auto m = overlay_.getManifestsMessage()) if (auto m = overlay_.getManifestsMessage())
send(m); send(m);
setTimer(); setTimer(peerTimerInterval);
} }
// Called repeatedly with protocol message data // Called repeatedly with protocol message data
void void
PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred) PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
{ {
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::PeerImp::onReadMessage : strand in this thread");
readPending_ = false;
if (!socket_.is_open()) if (!socket_.is_open())
return; return;
if (ec == boost::asio::error::operation_aborted)
return;
if (ec == boost::asio::error::eof)
{
JLOG(journal_.info()) << "EOF";
return gracefulClose();
}
if (ec) if (ec)
{
if (ec == boost::asio::error::eof)
{
JLOG(journal_.debug()) << "EOF";
return shutdown();
}
if (ec == boost::asio::error::operation_aborted)
return tryAsyncShutdown();
return fail("onReadMessage", ec); return fail("onReadMessage", ec);
}
// we started shutdown, no reason to process further data
if (shutdown_)
return tryAsyncShutdown();
if (auto stream = journal_.trace()) if (auto stream = journal_.trace())
{ {
if (bytes_transferred > 0) stream << "onReadMessage: "
stream << "onReadMessage: " << bytes_transferred << " bytes"; << (bytes_transferred > 0
else ? to_string(bytes_transferred) + " bytes"
stream << "onReadMessage"; : "");
} }
metrics_.recv.add_message(bytes_transferred); metrics_.recv.add_message(bytes_transferred);
@@ -941,17 +1006,29 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
350ms, 350ms,
journal_); journal_);
if (ec)
return fail("onReadMessage", ec);
if (!socket_.is_open()) if (!socket_.is_open())
return; return;
if (gracefulClose_)
return; // the error_code is produced by invokeProtocolMessage
// it could be due to a bad message
if (ec)
return fail("onReadMessage", ec);
if (bytes_consumed == 0) if (bytes_consumed == 0)
break; break;
read_buffer_.consume(bytes_consumed); read_buffer_.consume(bytes_consumed);
} }
// check if a shutdown was initiated while processing messages
if (shutdown_)
return tryAsyncShutdown();
readPending_ = true;
XRPL_ASSERT(
!shutdownStarted_, "ripple::PeerImp::onReadMessage : shutdown started");
// Timeout on writes only // Timeout on writes only
stream_.async_read_some( stream_.async_read_some(
read_buffer_.prepare(std::max(Tuning::readBufferBytes, hint)), read_buffer_.prepare(std::max(Tuning::readBufferBytes, hint)),
@@ -967,18 +1044,29 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
void void
PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred) PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
{ {
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::PeerImp::onWriteMessage : strand in this thread");
writePending_ = false;
if (!socket_.is_open()) if (!socket_.is_open())
return; return;
if (ec == boost::asio::error::operation_aborted)
return;
if (ec) if (ec)
{
if (ec == boost::asio::error::operation_aborted)
return tryAsyncShutdown();
return fail("onWriteMessage", ec); return fail("onWriteMessage", ec);
}
if (auto stream = journal_.trace()) if (auto stream = journal_.trace())
{ {
if (bytes_transferred > 0) stream << "onWriteMessage: "
stream << "onWriteMessage: " << bytes_transferred << " bytes"; << (bytes_transferred > 0
else ? to_string(bytes_transferred) + " bytes"
stream << "onWriteMessage"; : "");
} }
metrics_.sent.add_message(bytes_transferred); metrics_.sent.add_message(bytes_transferred);
@@ -987,8 +1075,17 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
!send_queue_.empty(), !send_queue_.empty(),
"ripple::PeerImp::onWriteMessage : non-empty send buffer"); "ripple::PeerImp::onWriteMessage : non-empty send buffer");
send_queue_.pop(); send_queue_.pop();
if (shutdown_)
return tryAsyncShutdown();
if (!send_queue_.empty()) if (!send_queue_.empty())
{ {
writePending_ = true;
XRPL_ASSERT(
!shutdownStarted_,
"ripple::PeerImp::onWriteMessage : shutdown started");
// Timeout on writes only // Timeout on writes only
return boost::asio::async_write( return boost::asio::async_write(
stream_, stream_,
@@ -1002,16 +1099,6 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
std::placeholders::_1, std::placeholders::_1,
std::placeholders::_2))); std::placeholders::_2)));
} }
if (gracefulClose_)
{
return stream_.async_shutdown(bind_executor(
strand_,
std::bind(
&PeerImp::onShutdown,
shared_from_this(),
std::placeholders::_1)));
}
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------

View File

@@ -40,6 +40,7 @@
#include <boost/endian/conversion.hpp> #include <boost/endian/conversion.hpp>
#include <boost/thread/shared_mutex.hpp> #include <boost/thread/shared_mutex.hpp>
#include <atomic>
#include <cstdint> #include <cstdint>
#include <optional> #include <optional>
#include <queue> #include <queue>
@@ -49,6 +50,68 @@ namespace ripple {
struct ValidatorBlobInfo; struct ValidatorBlobInfo;
class SHAMap; class SHAMap;
/**
* @class PeerImp
* @brief This class manages established peer-to-peer connections, handles
message exchange, monitors connection health, and graceful shutdown.
*
* The PeerImp shutdown mechanism is a multi-stage process
* designed to ensure graceful connection termination while handling ongoing
* I/O operations safely. The shutdown can be initiated from multiple points
* and follows a deterministic state machine.
*
* The shutdown process can be triggered from several entry points:
* - **External requests**: `stop()` method called by overlay management
* - **Error conditions**: `fail(error_code)` or `fail(string)` on protocol
* violations
* - **Timer expiration**: Various timeout scenarios (ping timeout, large send
* queue)
* - **Connection health**: Peer tracking divergence or unknown state timeouts
*
* The shutdown follows this progression:
*
* Normal Operation → shutdown() → tryAsyncShutdown() → onShutdown() → close()
* ↓ ↓ ↓ ↓
* Set shutdown_ SSL graceful Timer cancel Socket close
* Cancel timer shutdown start & cleanup & metrics
* 5s safety timer Set shutdownStarted_ update
*
* Two primary flags coordinate the shutdown process:
* - `shutdown_`: Set when shutdown is requested
* - `shutdownStarted_`: Set when SSL shutdown begins
*
* The shutdown mechanism carefully coordinates with ongoing read/write
* operations:
*
* **Read Operations (`onReadMessage`)**:
* - Checks `shutdown_` flag after processing each message batch
* - If shutdown initiated during processing, calls `tryAsyncShutdown()`
*
* **Write Operations (`onWriteMessage`)**:
* - Checks `shutdown_` flag before queuing new writes
* - Calls `tryAsyncShutdown()` when shutdown flag detected
*
* Multiple timers require coordination during shutdown:
* 1. **Peer Timer**: Regular ping/pong timer cancelled immediately in
* `shutdown()`
* 2. **Shutdown Timer**: 5-second safety timer ensures shutdown completion
* 3. **Operation Cancellation**: All pending async operations are cancelled
*
* The shutdown implements fallback mechanisms:
* - **Graceful Path**: SSL shutdown → Socket close → Cleanup
* - **Forced Path**: If SSL shutdown fails or times out, proceeds to socket
* close
* - **Safety Timer**: 5-second timeout prevents hanging shutdowns
*
* All shutdown operations are serialized through the boost::asio::strand to
* ensure thread safety. The strand guarantees that shutdown state changes
* and I/O operation callbacks are executed sequentially.
*
* @note This class requires careful coordination between async operations,
* timer management, and shutdown procedures to ensure no resource leaks
* or hanging connections in high-throughput networking scenarios.
*/
class PeerImp : public Peer, class PeerImp : public Peer,
public std::enable_shared_from_this<PeerImp>, public std::enable_shared_from_this<PeerImp>,
public OverlayImpl::Child public OverlayImpl::Child
@@ -79,6 +142,8 @@ private:
socket_type& socket_; socket_type& socket_;
stream_type& stream_; stream_type& stream_;
boost::asio::strand<boost::asio::executor> strand_; boost::asio::strand<boost::asio::executor> strand_;
// Multi-purpose timer for peer activity monitoring and shutdown safety
waitable_timer timer_; waitable_timer timer_;
// Updated at each stage of the connection process to reflect // Updated at each stage of the connection process to reflect
@@ -95,7 +160,6 @@ private:
std::atomic<Tracking> tracking_; std::atomic<Tracking> tracking_;
clock_type::time_point trackingTime_; clock_type::time_point trackingTime_;
bool detaching_ = false;
// Node public key of peer. // Node public key of peer.
PublicKey const publicKey_; PublicKey const publicKey_;
std::string name_; std::string name_;
@@ -175,7 +239,19 @@ private:
http_response_type response_; http_response_type response_;
boost::beast::http::fields const& headers_; boost::beast::http::fields const& headers_;
std::queue<std::shared_ptr<Message>> send_queue_; std::queue<std::shared_ptr<Message>> send_queue_;
bool gracefulClose_ = false;
// Primary shutdown flag set when shutdown is requested
bool shutdown_ = false;
// SSL shutdown coordination flag
bool shutdownStarted_ = false;
// Indicates a read operation is currently pending
bool readPending_ = false;
// Indicates a write operation is currently pending
bool writePending_ = false;
int large_sendq_ = 0; int large_sendq_ = 0;
std::unique_ptr<LoadEvent> load_event_; std::unique_ptr<LoadEvent> load_event_;
// The highest sequence of each PublisherList that has // The highest sequence of each PublisherList that has
@@ -425,9 +501,6 @@ public:
bool bool
isHighLatency() const override; isHighLatency() const override;
void
fail(std::string const& reason);
bool bool
compressionEnabled() const override compressionEnabled() const override
{ {
@@ -441,32 +514,129 @@ public:
} }
private: private:
void /**
close(); * @brief Handles a failure associated with a specific error code.
*
* This function is called when an operation fails with an error code. It
* logs the warning message and gracefully shutdowns the connection.
*
* The function will do nothing if the connection is already closed or if a
* shutdown is already in progress.
*
* @param name The name of the operation that failed (e.g., "read",
* "write").
* @param ec The error code associated with the failure.
* @note This function must be called from within the object's strand.
*/
void void
fail(std::string const& name, error_code ec); fail(std::string const& name, error_code ec);
/**
* @brief Handles a failure described by a reason string.
*
* This overload is used for logical errors or protocol violations not
* associated with a specific error code. It logs a warning with the
* given reason, then initiates a graceful shutdown.
*
* The function will do nothing if the connection is already closed or if a
* shutdown is already in progress.
*
* @param reason A descriptive string explaining the reason for the failure.
* @note This function must be called from within the object's strand.
*/
void void
gracefulClose(); fail(std::string const& reason);
/** @brief Initiates the peer disconnection sequence.
*
* This is the primary entry point to start closing a peer connection. It
* marks the peer for shutdown and cancels any outstanding asynchronous
* operations. This cancellation allows the graceful shutdown to proceed
* once the handlers for the cancelled operations have completed.
*
* @note This method must be called on the peer's strand.
*/
void void
setTimer(); shutdown();
/** @brief Attempts to perform a graceful SSL shutdown if conditions are
* met.
*
* This helper function checks if the peer is in a state where a graceful
* SSL shutdown can be performed (i.e., shutdown has been requested and no
* I/O operations are currently in progress).
*
* @note This method must be called on the peer's strand.
*/
void void
cancelTimer(); tryAsyncShutdown();
/**
* @brief Handles the completion of the asynchronous SSL shutdown.
*
* This function is the callback for the `async_shutdown` operation started
* in `shutdown()`. Its first action is to cancel the timer. It
* then inspects the error code to determine the outcome.
*
* Regardless of the result, this function proceeds to call `close()` to
* ensure the underlying socket is fully closed.
*
* @param ec The error code resulting from the `async_shutdown` operation.
*/
void
onShutdown(error_code ec);
/**
* @brief Forcibly closes the underlying socket connection.
*
* This function provides the final, non-graceful shutdown of the peer
* connection. It ensures any pending timers are cancelled and then
* immediately closes the TCP socket, bypassing the SSL shutdown handshake.
*
* After closing, it notifies the overlay manager of the disconnection.
*
* @note This function must be called from within the object's strand.
*/
void
close();
/**
* @brief Sets and starts the peer timer.
*
* This function starts timer, which is used to detect inactivity
* and prevent stalled connections. It sets the timer to expire after the
* predefined `peerTimerInterval`.
*
* @note This function will terminate the connection in case of any errors.
*/
void
setTimer(std::chrono::seconds interval);
/**
* @brief Handles the expiration of the peer activity timer.
*
* This callback is invoked when the timer set by `setTimer` expires. It
* watches the peer connection, checking for various timeout and health
* conditions.
*
* @param ec The error code associated with the timer's expiration.
* `operation_aborted` is expected if the timer was cancelled.
*/
void
onTimer(error_code const& ec);
/**
* @brief Cancels any pending wait on the peer activity timer.
*
* This function is called to stop the timer. It gracefully manages any
* errors that might occur during the cancellation process.
*/
void
cancelTimer() noexcept;
static std::string static std::string
makePrefix(id_t id); makePrefix(id_t id);
// Called when the timer wait completes
void
onTimer(boost::system::error_code const& ec);
// Called when SSL shutdown completes
void
onShutdown(error_code ec);
void void
doAccept(); doAccept();