adds graceful shutdown for outbound connection attempt

This commit is contained in:
Vito
2025-08-19 13:35:10 +02:00
parent 491bc19f5d
commit 7e41aaeffb
2 changed files with 132 additions and 61 deletions

View File

@@ -25,6 +25,7 @@
#include <xrpl/json/json_reader.h>
#include <chrono>
#include <sstream>
namespace ripple {
@@ -70,11 +71,13 @@ ConnectAttempt::stop()
if (!strand_.running_in_this_thread())
return strand_.post(
std::bind(&ConnectAttempt::stop, shared_from_this()));
if (socket_.is_open())
{
JLOG(journal_.debug()) << "Stop";
}
close();
if (!socket_.is_open())
return;
JLOG(journal_.debug()) << "Stop";
shutdown();
}
void
@@ -90,6 +93,55 @@ ConnectAttempt::run()
//------------------------------------------------------------------------------
void
ConnectAttempt::shutdown()
{
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::ConnectAttempt::shutdown: strand in this thread");
if (!socket_.is_open() || shutdown_)
return;
shutdown_ = true;
setTimer();
error_code ec;
// cancel asynchronous I/O
socket_.cancel(ec);
// gracefully shutdown the SSL socket, performing a shutdown handshake
stream_.async_shutdown(bind_executor(
strand_,
std::bind(
&ConnectAttempt::onShutdown,
shared_from_this(),
std::placeholders::_1)));
}
void
ConnectAttempt::onShutdown(error_code ec)
{
cancelTimer();
if (ec)
{
// - eof: the stream was cleanly closed
// - operation_aborted: an expired timer (slow shutdown)
// - stream_truncated: the tcp connection closed (no handshake) it could
// occur if a peer does not perform a graceful disconnect
// - broken_pipe: the peer is gone
if (ec != boost::asio::error::eof &&
ec != boost::asio::error::operation_aborted)
{
JLOG(journal_.warn()) << "onShutdown: " << ec.message();
}
}
close();
}
void
ConnectAttempt::close()
{
@@ -104,21 +156,21 @@ ConnectAttempt::close()
error_code ec;
socket_.close(ec);
JLOG(journal_.debug()) << "Closed";
JLOG(journal_.debug()) << "close: Closed";
}
void
ConnectAttempt::fail(std::string const& reason)
{
JLOG(journal_.debug()) << reason;
close();
shutdown();
}
void
ConnectAttempt::fail(std::string const& name, error_code ec)
{
JLOG(journal_.debug()) << name << ": " << ec.message();
close();
shutdown();
}
void
@@ -150,14 +202,17 @@ ConnectAttempt::onTimer(error_code ec)
{
if (!socket_.is_open())
return;
if (ec == boost::asio::error::operation_aborted)
return;
if (ec)
{
if (ec == boost::asio::error::operation_aborted)
return;
// This should never happen
JLOG(journal_.error()) << "onTimer: " << ec.message();
return close();
}
fail("Timeout");
}
@@ -166,18 +221,26 @@ ConnectAttempt::onConnect(error_code ec)
{
cancelTimer();
if (ec == boost::asio::error::operation_aborted)
return;
endpoint_type local_endpoint;
if (!ec)
local_endpoint = socket_.local_endpoint(ec);
if (ec)
return fail("onConnect", ec);
if (!socket_.is_open())
return;
if (ec)
{
if (ec == boost::asio::error::operation_aborted)
return;
return fail("onConnect", ec);
}
// check if connection has really been established
socket_.local_endpoint(ec);
if (ec)
return fail("onConnect", ec);
JLOG(journal_.trace()) << "onConnect";
setTimer();
stream_.set_verify_mode(boost::asio::ssl::verify_none);
stream_.async_handshake(
boost::asio::ssl::stream_base::client,
@@ -191,24 +254,30 @@ void
ConnectAttempt::onHandshake(error_code ec)
{
cancelTimer();
if (!socket_.is_open())
return;
if (ec == boost::asio::error::operation_aborted)
return;
endpoint_type local_endpoint;
if (!ec)
local_endpoint = socket_.local_endpoint(ec);
if (ec)
{
if (ec == boost::asio::error::operation_aborted)
return;
return fail("onHandshake", ec);
}
endpoint_type local_endpoint = socket_.local_endpoint(ec);
if (ec)
return fail("onHandshake", ec);
JLOG(journal_.trace()) << "onHandshake";
// check if we connected to ourselves
if (!overlay_.peerFinder().onConnected(
slot_, beast::IPAddressConversion::from_asio(local_endpoint)))
return fail("Duplicate connection");
return fail("Self connection");
auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
if (!sharedValue)
return close(); // makeSharedValue logs
return shutdown(); // makeSharedValue logs
req_ = makeRequest(
!overlay_.peerFinder().config().peerPrivate,
@@ -241,10 +310,15 @@ ConnectAttempt::onWrite(error_code ec)
cancelTimer();
if (!socket_.is_open())
return;
if (ec == boost::asio::error::operation_aborted)
return;
if (ec)
{
if (ec == boost::asio::error::operation_aborted)
return;
return fail("onWrite", ec);
}
boost::beast::http::async_read(
stream_,
read_buf_,
@@ -262,34 +336,22 @@ ConnectAttempt::onRead(error_code ec)
if (!socket_.is_open())
return;
if (ec == boost::asio::error::operation_aborted)
return;
if (ec == boost::asio::error::eof)
{
JLOG(journal_.info()) << "EOF";
setTimer();
return stream_.async_shutdown(strand_.wrap(std::bind(
&ConnectAttempt::onShutdown,
shared_from_this(),
std::placeholders::_1)));
}
if (ec)
return fail("onRead", ec);
processResponse();
}
void
ConnectAttempt::onShutdown(error_code ec)
{
cancelTimer();
if (!ec)
if (ec)
{
JLOG(journal_.error()) << "onShutdown: expected error condition";
return close();
if (ec == boost::asio::error::eof)
{
JLOG(journal_.debug()) << "EOF";
return shutdown();
}
if (ec == boost::asio::error::operation_aborted)
return;
return fail("onRead", ec);
}
if (ec != boost::asio::error::eof)
return fail("onShutdown", ec);
close();
processResponse();
}
//--------------------------------------------------------------------------
@@ -338,7 +400,7 @@ ConnectAttempt::processResponse()
JLOG(journal_.info())
<< "Unable to upgrade to peer protocol: " << response_.result()
<< " (" << response_.reason() << ")";
return close();
return shutdown();
}
// Just because our peer selected a particular protocol version doesn't
@@ -358,7 +420,7 @@ ConnectAttempt::processResponse()
auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
if (!sharedValue)
return close(); // makeSharedValue logs
return shutdown(); // makeSharedValue logs
try
{
@@ -385,7 +447,12 @@ ConnectAttempt::processResponse()
auto const result = overlay_.peerFinder().activate(
slot_, publicKey, static_cast<bool>(member));
if (result != PeerFinder::Result::success)
return fail("Outbound " + std::string(to_string(result)));
{
std::stringstream ss;
ss << "Outbound Connect Attempt " << remote_endpoint_ << " "
<< to_string(result);
return fail(ss.str());
}
auto const peer = std::make_shared<PeerImp>(
app_,

View File

@@ -59,6 +59,7 @@ private:
response_type response_;
std::shared_ptr<PeerFinder::Slot> slot_;
request_type req_;
bool shutdown_ = false;
public:
ConnectAttempt(
@@ -81,12 +82,6 @@ public:
run();
private:
void
close();
void
fail(std::string const& reason);
void
fail(std::string const& name, error_code ec);
void
setTimer();
void
@@ -102,7 +97,16 @@ private:
void
onRead(error_code ec);
void
fail(std::string const& reason);
void
fail(std::string const& name, error_code ec);
void
shutdown();
void
onShutdown(error_code ec);
void
close();
void
processResponse();