Compare commits

...

2 Commits

Author SHA1 Message Date
Vito
a15abd4067 attempt to appease formatting gods 2025-09-02 15:08:57 +02:00
Vito
d9695be838 refactor(overlay): Overhaul peer disconnection logic
This commit refactors the peer shutdown and failure handling
mechanism to be more robust, consistent, and communicative.

The previous implementation used raw strings to represent error reasons
and did not communicate these reasons to peers when shutting down a connection.

With this change disconnections are now explicitly communicated via a
`TMClose` protocol message with strongly-typed reasons. This new
approach provides better diagnostics and makes the peer disconnection
process more stable and predictable.
2025-09-02 15:00:12 +02:00
6 changed files with 127 additions and 87 deletions

View File

@@ -26,6 +26,7 @@ enum MessageType {
mtREPLAY_DELTA_RESPONSE = 60;
mtHAVE_TRANSACTIONS = 63;
mtTRANSACTIONS = 64;
mtCLOSE = 65;
}
// token, iterations, target, challenge = issue demand for proof of work
@@ -341,3 +342,19 @@ message TMReplayDeltaResponse {
message TMHaveTransactions {
repeated bytes hashes = 1;
}
enum TMCloseReason {
crRESOURCE = 1;
crINVALID_CLOSED_LEDGER = 2;
crINVALID_PREV_LEDGER = 3;
crBAD_LEDGER_HEADERS = 4;
crLARGE_SEND_QUEUE = 5;
crNOT_USEFUL = 6;
crPING_TIMEOUT = 7;
crINTERNAL = 8;
crSHUTDOWN = 9;
}
message TMClose {
required TMCloseReason reason = 1;
}

View File

@@ -64,6 +64,34 @@ std::chrono::seconds constexpr peerTimerInterval{60};
// TODO: Remove this exclusion once unit tests are added after the hotfix
// release.
std::string
to_string(protocol::TMCloseReason reason)
{
switch (reason)
{
case protocol::crRESOURCE:
return "Too Many P2P Messages";
case protocol::crINVALID_CLOSED_LEDGER:
return "Invalid Closed Ledger Header";
case protocol::crINVALID_PREV_LEDGER:
return "Invalid Previous Ledger Header";
case protocol::crBAD_LEDGER_HEADERS:
return "Bad Ledger Headers";
case protocol::crLARGE_SEND_QUEUE:
return "Large Send Queue";
case protocol::crNOT_USEFUL:
return "Peer Not Useful";
case protocol::crPING_TIMEOUT:
return "Ping Timeout";
case protocol::crINTERNAL:
return "Internal Error";
case protocol::crSHUTDOWN:
return "Shutdown";
}
return "unknown";
}
PeerImp::PeerImp(
Application& app,
id_t id,
@@ -178,7 +206,7 @@ PeerImp::run()
closed = parseLedgerHash(iter->value());
if (!closed)
fail("Malformed handshake data (1)");
fail(protocol::TMCloseReason::crINVALID_CLOSED_LEDGER);
}
if (auto const iter = headers_.find("Previous-Ledger");
@@ -187,11 +215,11 @@ PeerImp::run()
previous = parseLedgerHash(iter->value());
if (!previous)
fail("Malformed handshake data (2)");
fail(protocol::TMCloseReason::crINVALID_PREV_LEDGER);
}
if (previous && !closed)
fail("Malformed handshake data (3)");
fail(protocol::TMCloseReason::crBAD_LEDGER_HEADERS);
{
std::lock_guard<std::mutex> sl(recentLock_);
@@ -231,7 +259,8 @@ PeerImp::stop()
JLOG(journal_.info()) << "Stop";
}
}
close();
sendAndClose(protocol::TMCloseReason::crSHUTDOWN);
}
//------------------------------------------------------------------------------
@@ -241,10 +270,6 @@ PeerImp::send(std::shared_ptr<Message> const& m)
{
if (!strand_.running_in_this_thread())
return post(strand_, std::bind(&PeerImp::send, shared_from_this(), m));
if (gracefulClose_)
return;
if (detaching_)
return;
auto validator = m->getValidatorKey();
if (validator && !squelch_.expireSquelch(*validator))
@@ -356,7 +381,7 @@ PeerImp::charge(Resource::Charge const& fee, std::string const& context)
{
// Sever the connection
overlay_.incPeerDisconnectCharges();
fail("charge: Resources");
fail(protocol::TMCloseReason::crRESOURCE);
}
}
@@ -580,7 +605,6 @@ PeerImp::close()
"ripple::PeerImp::close : strand in this thread");
if (socket_.is_open())
{
detaching_ = true; // DEPRECATED
try
{
timer_.cancel();
@@ -604,22 +628,25 @@ PeerImp::close()
}
void
PeerImp::fail(std::string const& reason)
PeerImp::fail(protocol::TMCloseReason reason)
{
if (!strand_.running_in_this_thread())
return post(
strand_,
std::bind(
(void(Peer::*)(std::string const&)) & PeerImp::fail,
(void(Peer::*)(protocol::TMCloseReason)) & PeerImp::fail,
shared_from_this(),
reason));
if (journal_.active(beast::severities::kWarning) && socket_.is_open())
if (journal_.active(beast::severities::kWarning) && socket_.is_open() &&
reason != protocol::TMCloseReason::crINTERNAL)
{
std::string const n = name();
JLOG(journal_.warn()) << (n.empty() ? remote_address_.to_string() : n)
<< " failed: " << reason;
<< " failed: " << to_string(reason);
}
close();
sendAndClose(reason);
}
void
@@ -634,28 +661,28 @@ PeerImp::fail(std::string const& name, error_code ec)
<< name << " from " << toBase58(TokenType::NodePublic, publicKey_)
<< " at " << remote_address_.to_string() << ": " << ec.message();
}
close();
sendAndClose(protocol::TMCloseReason::crINTERNAL);
}
void
PeerImp::gracefulClose()
PeerImp::sendAndClose(protocol::TMCloseReason reason)
{
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::PeerImp::gracefulClose : strand in this thread");
XRPL_ASSERT(
socket_.is_open(), "ripple::PeerImp::gracefulClose : socket is open");
XRPL_ASSERT(
!gracefulClose_,
"ripple::PeerImp::gracefulClose : socket is not closing");
gracefulClose_ = true;
if (send_queue_.size() > 0)
if (shutdown_)
return;
setTimer();
stream_.async_shutdown(bind_executor(
strand_,
std::bind(
&PeerImp::onShutdown, shared_from_this(), std::placeholders::_1)));
// erase all outstanding messages except for the one
// currently being executed
if (send_queue_.size() > 1)
{
decltype(send_queue_) q({send_queue_.front()});
send_queue_.swap(q);
}
shutdown_ = true;
protocol::TMClose tmGC;
tmGC.set_reason(reason);
send(std::make_shared<Message>(tmGC, protocol::mtCLOSE));
}
void
@@ -713,14 +740,11 @@ PeerImp::onTimer(error_code const& ec)
{
// This should never happen
JLOG(journal_.error()) << "onTimer: " << ec.message();
return close();
return fail(protocol::TMCloseReason::crINTERNAL);
}
if (large_sendq_++ >= Tuning::sendqIntervals)
{
fail("Large send queue");
return;
}
return fail(protocol::TMCloseReason::crLARGE_SEND_QUEUE);
if (auto const t = tracking_.load(); !inbound_ && t != Tracking::converged)
{
@@ -737,17 +761,13 @@ PeerImp::onTimer(error_code const& ec)
(duration > app_.config().MAX_UNKNOWN_TIME)))
{
overlay_.peerFinder().on_failure(slot_);
fail("Not useful");
return;
return fail(protocol::TMCloseReason::crNOT_USEFUL);
}
}
// Already waiting for PONG
if (lastPingSeq_)
{
fail("Ping Timeout");
return;
}
return fail(protocol::TMCloseReason::crPING_TIMEOUT);
lastPingTime_ = clock_type::now();
lastPingSeq_ = rand_int<std::uint32_t>();
@@ -761,21 +781,6 @@ PeerImp::onTimer(error_code const& ec)
setTimer();
}
void
PeerImp::onShutdown(error_code ec)
{
cancelTimer();
// If we don't get eof then something went wrong
if (!ec)
{
JLOG(journal_.error()) << "onShutdown: expected error condition";
return close();
}
if (ec != boost::asio::error::eof)
return fail("onShutdown", ec);
close();
}
//------------------------------------------------------------------------------
void
PeerImp::doAccept()
@@ -791,7 +796,10 @@ PeerImp::doAccept()
// This shouldn't fail since we already computed
// the shared value successfully in OverlayImpl
if (!sharedValue)
return fail("makeSharedValue: Unexpected failure");
{
JLOG(journal_.error()) << "doAccept: makeSharedValue failed";
return fail(protocol::TMCloseReason::crINTERNAL);
}
JLOG(journal_.info()) << "Protocol: " << to_string(protocol_);
JLOG(journal_.info()) << "Public Key: "
@@ -841,7 +849,9 @@ PeerImp::doAccept()
return fail("onWriteResponse", ec);
if (write_buffer->size() == bytes_transferred)
return doProtocolStart();
return fail("Failed to write header");
JLOG(journal_.error()) << "Failed to write header";
return fail(protocol::TMCloseReason::crINTERNAL);
}));
}
@@ -905,15 +915,19 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
{
if (!socket_.is_open())
return;
if (ec == boost::asio::error::operation_aborted)
// we started closing the local connection, stop reading
if (ec == boost::asio::error::operation_aborted || shutdown_)
return;
if (ec == boost::asio::error::eof)
{
// Peer initiated connection close, just clean up
JLOG(journal_.info()) << "EOF";
return gracefulClose();
return close();
}
if (ec)
return fail("onReadMessage", ec);
if (auto stream = journal_.trace())
{
if (bytes_transferred > 0)
@@ -945,8 +959,6 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
return fail("onReadMessage", ec);
if (!socket_.is_open())
return;
if (gracefulClose_)
return;
if (bytes_consumed == 0)
break;
read_buffer_.consume(bytes_consumed);
@@ -969,6 +981,7 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
{
if (!socket_.is_open())
return;
if (ec == boost::asio::error::operation_aborted)
return;
if (ec)
@@ -1002,16 +1015,9 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
std::placeholders::_1,
std::placeholders::_2)));
}
if (gracefulClose_)
{
return stream_.async_shutdown(bind_executor(
strand_,
std::bind(
&PeerImp::onShutdown,
shared_from_this(),
std::placeholders::_1)));
}
// If the send queue is empty and we are shutting down, close the connection
else if (shutdown_)
close();
}
//------------------------------------------------------------------------------
@@ -2746,6 +2752,20 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
<< "onMessage: TMSquelch " << slice << " " << id() << " " << duration;
}
void
PeerImp::onMessage(std::shared_ptr<protocol::TMClose> const& m)
{
if (m->has_reason())
{
JLOG(p_journal_.debug())
<< "onMessage: TMClose: peer closed the connection: "
<< to_string(m->reason());
}
// do not send a close message when the peer initiates the shutdown
close();
}
//--------------------------------------------------------------------------
void

View File

@@ -95,7 +95,7 @@ private:
std::atomic<Tracking> tracking_;
clock_type::time_point trackingTime_;
bool detaching_ = false;
bool shutdown_ = false;
// Node public key of peer.
PublicKey const publicKey_;
std::string name_;
@@ -175,7 +175,6 @@ private:
http_response_type response_;
boost::beast::http::fields const& headers_;
std::queue<std::shared_ptr<Message>> send_queue_;
bool gracefulClose_ = false;
int large_sendq_ = 0;
std::unique_ptr<LoadEvent> load_event_;
// The highest sequence of each PublisherList that has
@@ -426,7 +425,7 @@ public:
isHighLatency() const override;
void
fail(std::string const& reason);
fail(protocol::TMCloseReason reason);
bool
compressionEnabled() const override
@@ -445,10 +444,10 @@ private:
close();
void
fail(std::string const& name, error_code ec);
sendAndClose(protocol::TMCloseReason reason);
void
gracefulClose();
fail(std::string const& name, error_code ec);
void
setTimer();
@@ -463,10 +462,6 @@ private:
void
onTimer(boost::system::error_code const& ec);
// Called when SSL shutdown completes
void
onShutdown(error_code ec);
void
doAccept();
@@ -584,6 +579,8 @@ public:
onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m);
void
onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m);
void
onMessage(std::shared_ptr<protocol::TMClose> const& m);
private:
//--------------------------------------------------------------------------

View File

@@ -104,8 +104,8 @@ protocolMessageName(int type)
return "replay_delta_request";
case protocol::mtREPLAY_DELTA_RESPONSE:
return "replay_delta_response";
default:
break;
case protocol::mtCLOSE:
return "close";
}
return "unknown";
}
@@ -470,6 +470,10 @@ invokeProtocolMessage(
success = detail::invoke<protocol::TMReplayDeltaResponse>(
*header, buffers, handler);
break;
case protocol::mtCLOSE:
success =
detail::invoke<protocol::TMClose>(*header, buffers, handler);
break;
default:
handler.onMessageUnknown(header->message_type);
success = true;

View File

@@ -46,7 +46,7 @@ std::unordered_map<protocol::MessageType, TrafficCount::category> const
{protocol::mtTRANSACTIONS,
TrafficCount::category::requested_transactions},
{protocol::mtSQUELCH, TrafficCount::category::squelch},
};
{protocol::mtCLOSE, TrafficCount::category::close}};
TrafficCount::category
TrafficCount::categorize(

View File

@@ -195,7 +195,7 @@ public:
// The total p2p bytes sent and received on the wire
total,
close,
unknown // must be last
};
@@ -304,6 +304,7 @@ public:
{replay_delta_response, "replay_delta_response"},
{have_transactions, "have_transactions"},
{requested_transactions, "requested_transactions"},
{close, "close"},
{total, "total"}};
if (auto it = category_map.find(cat); it != category_map.end())
@@ -370,6 +371,7 @@ protected:
{have_transactions, {have_transactions}},
{requested_transactions, {requested_transactions}},
{total, {total}},
{close, {close}},
{unknown, {unknown}},
};
};