adds graceful shutdown to connect attempt

This commit is contained in:
Vito
2025-08-21 10:54:17 +02:00
parent d9884eefa1
commit 5b27b7e429
2 changed files with 77 additions and 29 deletions

View File

@@ -83,6 +83,8 @@ ConnectAttempt::stop()
void void
ConnectAttempt::run() ConnectAttempt::run()
{ {
isIOInProgress_ = true;
stream_.next_layer().async_connect( stream_.next_layer().async_connect(
remote_endpoint_, remote_endpoint_,
strand_.wrap(std::bind( strand_.wrap(std::bind(
@@ -100,22 +102,44 @@ ConnectAttempt::shutdown()
strand_.running_in_this_thread(), strand_.running_in_this_thread(),
"ripple::ConnectAttempt::shutdown: strand in this thread"); "ripple::ConnectAttempt::shutdown: strand in this thread");
if (!socket_.is_open() || shutdown_) if (!socket_.is_open())
return; return;
shutdown_ = true; shutdown_ = true;
stream_.next_layer().cancel();
close(); boost::beast::get_lowest_layer(stream_).cancel();
// setTimer();
// // gracefully shutdown the SSL socket, performing a shutdown handshake tryAsyncShutdown();
// stream_.async_shutdown(bind_executor( }
// strand_,
// std::bind( void
// &ConnectAttempt::onShutdown, ConnectAttempt::tryAsyncShutdown()
// shared_from_this(), {
// std::placeholders::_1))); XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::ConnectAttempt::tryAsyncShutdown : strand in this thread");
if (!shutdown_ || shutdownStarted_)
return;
if (isIOInProgress_)
return;
shutdownStarted_ = true;
XRPL_ASSERT(
!isIOInProgress_,
"ripple::ConnectAttempt::tryAsyncShutdown: io not in progress");
setTimer();
// gracefully shutdown the SSL socket, performing a shutdown handshake
stream_.async_shutdown(bind_executor(
strand_,
std::bind(
&ConnectAttempt::onShutdown,
shared_from_this(),
std::placeholders::_1)));
} }
void void
@@ -133,7 +157,7 @@ ConnectAttempt::onShutdown(error_code ec)
if (ec != boost::asio::error::eof && if (ec != boost::asio::error::eof &&
ec != boost::asio::error::operation_aborted) ec != boost::asio::error::operation_aborted)
{ {
JLOG(journal_.warn()) << "onShutdown: " << ec.message(); JLOG(journal_.debug()) << "onShutdown: " << ec.message();
} }
} }
@@ -204,7 +228,7 @@ ConnectAttempt::onTimer(error_code ec)
if (ec) if (ec)
{ {
if (ec == boost::asio::error::operation_aborted) if (ec == boost::asio::error::operation_aborted)
return; return tryAsyncShutdown();
// This should never happen // This should never happen
JLOG(journal_.error()) << "onTimer: " << ec.message(); JLOG(journal_.error()) << "onTimer: " << ec.message();
@@ -219,14 +243,19 @@ ConnectAttempt::onConnect(error_code ec)
{ {
cancelTimer(); cancelTimer();
isIOInProgress_ = false;
if (ec) if (ec)
{ {
if (ec == boost::asio::error::operation_aborted) if (ec == boost::asio::error::operation_aborted)
return; return tryAsyncShutdown();
return fail("onConnect", ec); return fail("onConnect", ec);
} }
if (!socket_.is_open())
return;
// check if connection has really been established // check if connection has really been established
socket_.local_endpoint(ec); socket_.local_endpoint(ec);
if (ec) if (ec)
@@ -234,11 +263,13 @@ ConnectAttempt::onConnect(error_code ec)
JLOG(journal_.trace()) << "onConnect"; JLOG(journal_.trace()) << "onConnect";
if (!socket_.is_open() || shutdown_) if (shutdown_)
return; return tryAsyncShutdown();
setTimer(); setTimer();
isIOInProgress_ = true;
stream_.set_verify_mode(boost::asio::ssl::verify_none); stream_.set_verify_mode(boost::asio::ssl::verify_none);
stream_.async_handshake( stream_.async_handshake(
boost::asio::ssl::stream_base::client, boost::asio::ssl::stream_base::client,
@@ -253,10 +284,12 @@ ConnectAttempt::onHandshake(error_code ec)
{ {
cancelTimer(); cancelTimer();
isIOInProgress_ = false;
if (ec) if (ec)
{ {
if (ec == boost::asio::error::operation_aborted) if (ec == boost::asio::error::operation_aborted)
return; return tryAsyncShutdown();
return fail("onHandshake", ec); return fail("onHandshake", ec);
} }
@@ -289,10 +322,13 @@ ConnectAttempt::onHandshake(error_code ec)
remote_endpoint_.address(), remote_endpoint_.address(),
app_); app_);
if (!socket_.is_open() || shutdown_) if (shutdown_)
return; return tryAsyncShutdown();
setTimer(); setTimer();
isIOInProgress_ = true;
boost::beast::http::async_write( boost::beast::http::async_write(
stream_, stream_,
req_, req_,
@@ -307,16 +343,21 @@ ConnectAttempt::onWrite(error_code ec)
{ {
cancelTimer(); cancelTimer();
isIOInProgress_ = false;
if (ec) if (ec)
{ {
if (ec == boost::asio::error::operation_aborted) if (ec == boost::asio::error::operation_aborted)
return; return tryAsyncShutdown();
return fail("onWrite", ec); return fail("onWrite", ec);
} }
if (!socket_.is_open() || shutdown_) if (shutdown_)
return; return tryAsyncShutdown();
setTimer();
isIOInProgress_ = true;
boost::beast::http::async_read( boost::beast::http::async_read(
stream_, stream_,
@@ -333,6 +374,8 @@ ConnectAttempt::onRead(error_code ec)
{ {
cancelTimer(); cancelTimer();
isIOInProgress_ = false;
if (ec) if (ec)
{ {
if (ec == boost::asio::error::eof) if (ec == boost::asio::error::eof)
@@ -342,13 +385,13 @@ ConnectAttempt::onRead(error_code ec)
} }
if (ec == boost::asio::error::operation_aborted) if (ec == boost::asio::error::operation_aborted)
return; return tryAsyncShutdown();
return fail("onRead", ec); return fail("onRead", ec);
} }
if (!socket_.is_open() || shutdown_) if (shutdown_)
return; return tryAsyncShutdown();
processResponse(); processResponse();
} }
@@ -453,9 +496,12 @@ ConnectAttempt::processResponse()
return fail(ss.str()); return fail(ss.str());
} }
if (!socket_.is_open() || shutdown_) if (!socket_.is_open())
return; return;
if (shutdown_)
return tryAsyncShutdown();
auto const peer = std::make_shared<PeerImp>( auto const peer = std::make_shared<PeerImp>(
app_, app_,
std::move(stream_ptr_), std::move(stream_ptr_),

View File

@@ -22,8 +22,6 @@
#include <xrpld/overlay/detail/OverlayImpl.h> #include <xrpld/overlay/detail/OverlayImpl.h>
#include <atomic>
namespace ripple { namespace ripple {
/** Manages an outbound connection attempt. */ /** Manages an outbound connection attempt. */
@@ -61,7 +59,9 @@ private:
response_type response_; response_type response_;
std::shared_ptr<PeerFinder::Slot> slot_; std::shared_ptr<PeerFinder::Slot> slot_;
request_type req_; request_type req_;
std::atomic<bool> shutdown_ = false; bool shutdown_ = false;
bool isIOInProgress_ = false;
bool shutdownStarted_ = false;
public: public:
ConnectAttempt( ConnectAttempt(
@@ -105,6 +105,8 @@ private:
void void
shutdown(); shutdown();
void void
tryAsyncShutdown();
void
onShutdown(error_code ec); onShutdown(error_code ec);
void void
close(); close();