W.I.P. correct async shutdown

This commit is contained in:
Vito
2025-08-20 11:20:40 +02:00
parent 11152e39e6
commit 3d07dba2ce
4 changed files with 148 additions and 42 deletions

View File

@@ -104,20 +104,18 @@ ConnectAttempt::shutdown()
return;
shutdown_ = true;
stream_.next_layer().cancel();
setTimer();
close();
// setTimer();
error_code ec;
// cancel asynchronous I/O
socket_.cancel(ec);
// gracefully shutdown the SSL socket, performing a shutdown handshake
stream_.async_shutdown(bind_executor(
strand_,
std::bind(
&ConnectAttempt::onShutdown,
shared_from_this(),
std::placeholders::_1)));
// // gracefully shutdown the SSL socket, performing a shutdown handshake
// stream_.async_shutdown(bind_executor(
// strand_,
// std::bind(
// &ConnectAttempt::onShutdown,
// shared_from_this(),
// std::placeholders::_1)));
}
void
@@ -221,9 +219,6 @@ ConnectAttempt::onConnect(error_code ec)
{
cancelTimer();
if (!socket_.is_open())
return;
if (ec)
{
if (ec == boost::asio::error::operation_aborted)
@@ -239,6 +234,9 @@ ConnectAttempt::onConnect(error_code ec)
JLOG(journal_.trace()) << "onConnect";
if (!socket_.is_open() || shutdown_)
return;
setTimer();
stream_.set_verify_mode(boost::asio::ssl::verify_none);
@@ -255,9 +253,6 @@ ConnectAttempt::onHandshake(error_code ec)
{
cancelTimer();
if (!socket_.is_open())
return;
if (ec)
{
if (ec == boost::asio::error::operation_aborted)
@@ -294,6 +289,9 @@ ConnectAttempt::onHandshake(error_code ec)
remote_endpoint_.address(),
app_);
if (!socket_.is_open() || shutdown_)
return;
setTimer();
boost::beast::http::async_write(
stream_,
@@ -308,8 +306,6 @@ void
ConnectAttempt::onWrite(error_code ec)
{
cancelTimer();
if (!socket_.is_open())
return;
if (ec)
{
@@ -319,6 +315,9 @@ ConnectAttempt::onWrite(error_code ec)
return fail("onWrite", ec);
}
if (!socket_.is_open() || shutdown_)
return;
boost::beast::http::async_read(
stream_,
read_buf_,
@@ -334,9 +333,6 @@ ConnectAttempt::onRead(error_code ec)
{
cancelTimer();
if (!socket_.is_open())
return;
if (ec)
{
if (ec == boost::asio::error::eof)
@@ -351,6 +347,9 @@ ConnectAttempt::onRead(error_code ec)
return fail("onRead", ec);
}
if (!socket_.is_open() || shutdown_)
return;
processResponse();
}
@@ -454,6 +453,9 @@ ConnectAttempt::processResponse()
return fail(ss.str());
}
if (!socket_.is_open() || shutdown_)
return;
auto const peer = std::make_shared<PeerImp>(
app_,
std::move(stream_ptr_),

View File

@@ -22,6 +22,8 @@
#include <xrpld/overlay/detail/OverlayImpl.h>
#include <atomic>
namespace ripple {
/** Manages an outbound connection attempt. */
@@ -59,7 +61,7 @@ private:
response_type response_;
std::shared_ptr<PeerFinder::Slot> slot_;
request_type req_;
bool shutdown_ = false;
std::atomic<bool> shutdown_ = false;
public:
ConnectAttempt(

View File

@@ -43,6 +43,7 @@
#include <boost/algorithm/string/predicate.hpp>
#include <boost/asio/error.hpp>
#include <boost/beast/core/ostream.hpp>
#include <boost/beast/core/stream_traits.hpp>
#include <algorithm>
#include <memory>
@@ -237,13 +238,13 @@ PeerImp::send(std::shared_ptr<Message> const& m)
if (!strand_.running_in_this_thread())
return post(strand_, std::bind(&PeerImp::send, shared_from_this(), m));
// we are in progress of closing the connection
if (shutdown_)
return;
if (!socket_.is_open())
return;
// we are in progress of closing the connection
if (shutdown_)
return tryAsyncShutdown();
auto validator = m->getValidatorKey();
if (validator && !squelch_.expireSquelch(*validator))
{
@@ -614,33 +615,86 @@ PeerImp::fail(std::string const& reason)
}
void
PeerImp::shutdown()
PeerImp::tryAsyncShutdown()
{
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::PeerImp::shutdown: strand in this thread");
if (!strand_.running_in_this_thread())
return post(
strand_, std::bind(&PeerImp::tryAsyncShutdown, shared_from_this()));
if (!socket_.is_open() || shutdown_)
// XRPL_ASSERT(
// strand_.running_in_this_thread(),
// "ripple::PeerImp::shutdown: strand in this thread");
if (!shutdown_ || shutdownStarted_)
return;
shutdown_ = true;
error_code ec;
// cancel asynchronous I/O
socket_.cancel(ec);
if (readInProgress_ || writeInProgress_)
{
JLOG(journal_.debug())
<< "tryAsyncShutdown: waiting - read: " << readInProgress_
<< " write: " << writeInProgress_;
return;
}
shutdownStarted_ = true;
XRPL_ASSERT(
!readInProgress_ && !writeInProgress_,
"ripple::PeerImp::shutdown: read and write not in progress");
JLOG(journal_.debug()) << "tryAsyncShutdown: read: " << readInProgress_
<< " write: " << writeInProgress_
<< " shutdown: " << shutdown_
<< " shutdownInProgress: " << shutdownStarted_;
setTimer();
// gracefully shutdown the SSL socket, performing a shutdown handshake
stream_.async_shutdown(bind_executor(
strand_,
std::bind(
&PeerImp::onShutdown, shared_from_this(), std::placeholders::_1)));
// stream_.async_shutdown([this](error_code ec) {
// cancelTimer();
// close();
// });
}
void
PeerImp::shutdown()
{
// XRPL_ASSERT(
// strand_.running_in_this_thread(),
// "ripple::PeerImp::shutdown: strand in this thread");
if (!strand_.running_in_this_thread())
return post(strand_, std::bind(&PeerImp::shutdown, shared_from_this()));
if (!socket_.is_open())
return;
shutdown_ = true;
JLOG(journal_.debug()) << "shutdown";
// cancel asynchronous I/O
// error_code ec;
// auto ret = socket_.cancel(ec);
boost::beast::get_lowest_layer(stream_).cancel();
// if (ec)
// {
// JLOG(journal_.debug()) << "shutdown: cancel err: " << ec.what();
// }
// if (ret)
// {
// JLOG(journal_.debug()) << "shutdown: cancel err: " << ret.what();
// }
tryAsyncShutdown();
};
void
PeerImp::onShutdown(error_code ec)
{
cancelTimer();
JLOG(journal_.debug()) << "onShutdown";
if (ec)
{
// - eof: the stream was cleanly closed
@@ -795,6 +849,9 @@ PeerImp::doAccept()
"ripple::PeerImp::doAccept : empty read buffer");
JLOG(journal_.debug()) << "doAccept: " << remote_address_;
// a shutdown was initiated before the handshare, there is nothing to do
if (shutdown_)
return tryAsyncShutdown();
auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
@@ -875,6 +932,7 @@ PeerImp::domain() const
void
PeerImp::doProtocolStart()
{
// a shutdown was initiated before the handshare, there is nothing to do
onReadMessage(error_code(), 0);
// Send all the validator lists that have been loaded
@@ -913,6 +971,12 @@ PeerImp::doProtocolStart()
void
PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
{
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::PeerImp::onReadMessage : strand in this thread");
readInProgress_ = false;
if (!socket_.is_open())
return;
@@ -925,7 +989,11 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
}
if (ec == boost::asio::error::operation_aborted)
return;
{
JLOG(journal_.debug()) << "onReadMessage: aborted";
return tryAsyncShutdown();
}
return fail("onReadMessage", ec);
}
@@ -957,7 +1025,7 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
350ms,
journal_);
if (!socket_.is_open() || shutdown_)
if (!socket_.is_open())
return;
// the error_code is produced by invokeProtocolMessage
@@ -967,9 +1035,23 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
if (bytes_consumed == 0)
break;
read_buffer_.consume(bytes_consumed);
}
if (shutdown_)
return tryAsyncShutdown();
readInProgress_ = true;
XRPL_ASSERT(
!shutdownStarted_, "ripple::PeerImp::onReadMessage : shutdown started");
JLOG(journal_.debug()) << "onReadMessage: read: " << readInProgress_
<< " write: " << writeInProgress_
<< " shutdown: " << shutdown_
<< " shutdownInProgress: " << shutdownStarted_;
// Timeout on writes only
stream_.async_read_some(
read_buffer_.prepare(std::max(Tuning::readBufferBytes, hint)),
@@ -985,13 +1067,21 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
void
PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
{
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::PeerImp::fail : strand in this thread");
writeInProgress_ = false;
if (!socket_.is_open())
return;
if (ec)
{
if (ec == boost::asio::error::operation_aborted)
return;
{
JLOG(journal_.debug()) << "onWriteMessage: aborted";
return tryAsyncShutdown();
}
return fail("onWriteMessage", ec);
}
@@ -1010,8 +1100,13 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
!send_queue_.empty(),
"ripple::PeerImp::onWriteMessage : non-empty send buffer");
send_queue_.pop();
if (shutdown_)
return tryAsyncShutdown();
if (!send_queue_.empty())
{
writeInProgress_ = true;
// Timeout on writes only
return boost::asio::async_write(
stream_,

View File

@@ -40,6 +40,7 @@
#include <boost/endian/conversion.hpp>
#include <boost/thread/shared_mutex.hpp>
#include <atomic>
#include <cstdint>
#include <optional>
#include <queue>
@@ -174,7 +175,10 @@ private:
http_response_type response_;
boost::beast::http::fields const& headers_;
std::queue<std::shared_ptr<Message>> send_queue_;
bool shutdown_ = false;
std::atomic<bool> shutdown_ = false;
std::atomic<bool> shutdownStarted_ = false;
std::atomic<bool> readInProgress_ = false;
std::atomic<bool> writeInProgress_ = false;
int large_sendq_ = 0;
std::unique_ptr<LoadEvent> load_event_;
// The highest sequence of each PublisherList that has
@@ -484,6 +488,9 @@ private:
*/
void
shutdown();
void
tryAsyncShutdown();
/**
* @brief Handles the completion of the asynchronous SSL shutdown.
*