adds graceful peer disconnection

This commit is contained in:
Vito
2025-08-12 19:02:56 +02:00
parent c9d73b6135
commit 076371746b
3 changed files with 131 additions and 109 deletions

View File

@@ -24,6 +24,8 @@
#include <xrpl/json/json_reader.h>
#include <chrono>
namespace ripple {
ConnectAttempt::ConnectAttempt(
@@ -94,13 +96,16 @@ ConnectAttempt::close()
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::ConnectAttempt::close : strand in this thread");
if (socket_.is_open())
{
error_code ec;
timer_.cancel(ec);
socket_.close(ec);
JLOG(journal_.debug()) << "Closed";
}
if (!socket_.is_open())
return;
cancelTimer();
error_code ec;
stream_.shutdown(ec);
socket_.close(ec);
JLOG(journal_.debug()) << "Closed";
}
void
@@ -120,13 +125,15 @@ ConnectAttempt::fail(std::string const& name, error_code ec)
void
ConnectAttempt::setTimer()
{
error_code ec;
timer_.expires_from_now(std::chrono::seconds(15), ec);
if (ec)
try
{
JLOG(journal_.error()) << "setTimer: " << ec.message();
return;
timer_.expires_after(std::chrono::seconds(15));
}
catch (std::exception const& ex)
{
JLOG(journal_.error()) << "setTimer: " << ex.what();
return;
};
timer_.async_wait(strand_.wrap(std::bind(
&ConnectAttempt::onTimer, shared_from_this(), std::placeholders::_1)));

View File

@@ -221,7 +221,6 @@ PeerImp::stop()
// outbound connections are under our control and may be logged
// at a higher level, but inbound connections are more numerous and
// uncontrolled so to prevent log flooding the severity is reduced.
//
if (inbound_)
{
JLOG(journal_.debug()) << "Stop";
@@ -243,8 +242,6 @@ PeerImp::send(std::shared_ptr<Message> const& m)
return post(strand_, std::bind(&PeerImp::send, shared_from_this(), m));
if (gracefulClose_)
return;
if (detaching_)
return;
auto validator = m->getValidatorKey();
if (validator && !squelch_.expireSquelch(*validator))
@@ -578,21 +575,29 @@ PeerImp::close()
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::PeerImp::close : strand in this thread");
if (socket_.is_open())
// the socket is closed, this may due to concurrent calls to close()
if (!socket_.is_open())
return;
// gracefully shutdown the SSL socket, performing a shutdown handshake
error_code ec;
timer_.cancel(ec);
stream_.shutdown(ec);
socket_.close(ec);
overlay_.incPeerDisconnect();
// The rationale for using different severity levels is that
// outbound connections are under our control and may be logged
// at a higher level, but inbound connections are more numerous and
// uncontrolled so to prevent log flooding the severity is reduced.
if (inbound_)
{
detaching_ = true; // DEPRECATED
error_code ec;
timer_.cancel(ec);
socket_.close(ec);
overlay_.incPeerDisconnect();
if (inbound_)
{
JLOG(journal_.debug()) << "Closed";
}
else
{
JLOG(journal_.info()) << "Closed";
}
JLOG(journal_.debug()) << "Closed";
}
else
{
JLOG(journal_.info()) << "Closed";
}
}
@@ -606,12 +611,14 @@ PeerImp::fail(std::string const& reason)
(void(Peer::*)(std::string const&)) & PeerImp::fail,
shared_from_this(),
reason));
if (journal_.active(beast::severities::kWarning) && socket_.is_open())
{
std::string const n = name();
JLOG(journal_.warn()) << (n.empty() ? remote_address_.to_string() : n)
<< " failed: " << reason;
}
if (!socket_.is_open())
return;
std::string const n = name();
JLOG(journal_.warn()) << (n.empty() ? remote_address_.to_string() : n)
<< " failed: " << reason;
close();
}
@@ -621,12 +628,15 @@ PeerImp::fail(std::string const& name, error_code ec)
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::PeerImp::fail : strand in this thread");
if (socket_.is_open())
{
JLOG(journal_.warn())
<< name << " from " << toBase58(TokenType::NodePublic, publicKey_)
<< " at " << remote_address_.to_string() << ": " << ec.message();
}
if (!socket_.is_open())
return;
JLOG(journal_.warn()) << name << " from "
<< toBase58(TokenType::NodePublic, publicKey_)
<< " at " << remote_address_.to_string() << ": "
<< ec.message();
close();
}
@@ -641,41 +651,35 @@ PeerImp::gracefulClose()
XRPL_ASSERT(
!gracefulClose_,
"ripple::PeerImp::gracefulClose : socket is not closing");
gracefulClose_ = true;
if (send_queue_.size() > 0)
return;
setTimer();
stream_.async_shutdown(bind_executor(
strand_,
std::bind(
&PeerImp::onShutdown, shared_from_this(), std::placeholders::_1)));
close();
}
void
PeerImp::setTimer()
{
error_code ec;
timer_.expires_from_now(peerTimerInterval, ec);
if (ec)
try
{
JLOG(journal_.error()) << "setTimer: " << ec.message();
return;
timer_.expires_after(peerTimerInterval);
}
catch (std::exception const& ex)
{
JLOG(journal_.error())
<< "setTimer: error expiring the timer: " << ex.what();
return close();
};
timer_.async_wait(bind_executor(
strand_,
std::bind(
&PeerImp::onTimer, shared_from_this(), std::placeholders::_1)));
}
// convenience for ignoring the error code
void
PeerImp::cancelTimer()
{
error_code ec;
timer_.cancel(ec);
}
//------------------------------------------------------------------------------
std::string
@@ -703,10 +707,7 @@ PeerImp::onTimer(error_code const& ec)
}
if (large_sendq_++ >= Tuning::sendqIntervals)
{
fail("Large send queue");
return;
}
return fail("Large send queue");
if (auto const t = tracking_.load(); !inbound_ && t != Tracking::converged)
{
@@ -723,17 +724,13 @@ PeerImp::onTimer(error_code const& ec)
(duration > app_.config().MAX_UNKNOWN_TIME)))
{
overlay_.peerFinder().on_failure(slot_);
fail("Not useful");
return;
return fail("Not useful");
}
}
// Already waiting for PONG
if (lastPingSeq_)
{
fail("Ping Timeout");
return;
}
return fail("Ping Timeout");
lastPingTime_ = clock_type::now();
lastPingSeq_ = rand_int<std::uint32_t>();
@@ -747,21 +744,6 @@ PeerImp::onTimer(error_code const& ec)
setTimer();
}
void
PeerImp::onShutdown(error_code ec)
{
cancelTimer();
// If we don't get eof then something went wrong
if (!ec)
{
JLOG(journal_.error()) << "onShutdown: expected error condition";
return close();
}
if (ec != boost::asio::error::eof)
return fail("onShutdown", ec);
close();
}
//------------------------------------------------------------------------------
void
PeerImp::doAccept()
@@ -898,8 +880,10 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
JLOG(journal_.info()) << "EOF";
return gracefulClose();
}
if (ec)
return fail("onReadMessage", ec);
if (auto stream = journal_.trace())
{
if (bytes_transferred > 0)
@@ -927,12 +911,12 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
350ms,
journal_);
if (ec)
return fail("onReadMessage", ec);
if (!socket_.is_open())
return;
if (gracefulClose_)
return;
if (ec)
return fail("onReadMessage", ec);
if (bytes_consumed == 0)
break;
read_buffer_.consume(bytes_consumed);
@@ -959,6 +943,7 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
return;
if (ec)
return fail("onWriteMessage", ec);
if (auto stream = journal_.trace())
{
if (bytes_transferred > 0)
@@ -989,15 +974,11 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
std::placeholders::_2)));
}
// Graceful close was initiated, but it was postponed to allow the server to
// finish sending messages to the peer. If we hit this code, that means all
// messages were sent, finish closing the connection
if (gracefulClose_)
{
return stream_.async_shutdown(bind_executor(
strand_,
std::bind(
&PeerImp::onShutdown,
shared_from_this(),
std::placeholders::_1)));
}
close();
}
//------------------------------------------------------------------------------

View File

@@ -95,7 +95,6 @@ private:
std::atomic<Tracking> tracking_;
clock_type::time_point trackingTime_;
bool detaching_ = false;
// Node public key of peer.
PublicKey const publicKey_;
std::string name_;
@@ -425,9 +424,6 @@ public:
bool
isHighLatency() const override;
void
fail(std::string const& reason);
bool
compressionEnabled() const override
{
@@ -441,21 +437,63 @@ public:
}
private:
void
close();
/** @brief Closes the connection and logs the reason for failure.
*
* @param name A string identifying the operation or context of the failure.
* @param ec The `error_code` associated with the failure.
* @note This operation is idempotent; calling it on an already closed
* socket has no effect.
* @note Must be called on the peer's strand.
*/
void
fail(std::string const& name, error_code ec);
/** @brief Closes the connection and logs a descriptive reason.
*
* @param reason A human-readable string explaining why the peer connection
* is being terminated.
* @note This operation is idempotent; calling it on an already closed
* socket has no effect.
* @note Must be called on the peer's strand.
*/
void
fail(std::string const& reason);
/** @brief Forcibly terminates the peer connection and performs cleanup.
*
* This function terminates the peer connection. It performs graceful SSL
* shutdown, closes the underlying network socket and cancels pending
* timers.
*
* @note This operation is idempotent; it's safe to call on a connection
* that is already closed.
* @note Must be called on the peer's strand.
*/
void
close();
/** @brief Initiates a graceful shutdown of the peer connection.
*
* This function marks the connection for closure. A "graceful" close
* ensures that any messages already queued for sending are transmitted
* before the underlying socket is closed. The connection may still be
* terminated forcefully if the remote server stopped reading the messages.
*
*
* If the send queue is empty, the connection is closed immediately. If
* messages are still pending, the actual socket closure is deferred until
* the send queue is drained by the I/O processing logic.
*
* @note This operation is idempotent; calling it on a connection that is
* already closed or in the process of closing has no effect.
* @note Must be called on the peer's strand.
*/
void
gracefulClose();
void
setTimer();
void
cancelTimer();
static std::string
makePrefix(id_t id);
@@ -463,10 +501,6 @@ private:
void
onTimer(boost::system::error_code const& ec);
// Called when SSL shutdown completes
void
onShutdown(error_code ec);
void
doAccept();