From d9695be838579efd4faae497ff23042c7ae9b899 Mon Sep 17 00:00:00 2001 From: Vito <5780819+Tapanito@users.noreply.github.com> Date: Tue, 2 Sep 2025 11:34:28 +0200 Subject: [PATCH] refactor(overlay): Overhaul peer disconnection logic This commit refactors the peer shutdown and failure handling mechanism to be more robust, consistent, and communicative. The previous implementation used raw strings to represent error reasons and did not communicate these reasons to peers when shutting down a connection. With this change disconnections are now explicitly communicated via a `TMClose` protocol message with strongly-typed reasons. This new approach provides better diagnostics and makes the peer disconnection process more stable and predictable. --- include/xrpl/proto/ripple.proto | 17 +++ src/xrpld/overlay/detail/PeerImp.cpp | 168 ++++++++++++--------- src/xrpld/overlay/detail/PeerImp.h | 15 +- src/xrpld/overlay/detail/ProtocolMessage.h | 8 +- src/xrpld/overlay/detail/TrafficCount.cpp | 2 +- src/xrpld/overlay/detail/TrafficCount.h | 4 +- 6 files changed, 127 insertions(+), 87 deletions(-) diff --git a/include/xrpl/proto/ripple.proto b/include/xrpl/proto/ripple.proto index f93ebbc72c..6758693ec7 100644 --- a/include/xrpl/proto/ripple.proto +++ b/include/xrpl/proto/ripple.proto @@ -26,6 +26,7 @@ enum MessageType { mtREPLAY_DELTA_RESPONSE = 60; mtHAVE_TRANSACTIONS = 63; mtTRANSACTIONS = 64; + mtCLOSE = 65; } // token, iterations, target, challenge = issue demand for proof of work @@ -341,3 +342,19 @@ message TMReplayDeltaResponse { message TMHaveTransactions { repeated bytes hashes = 1; } + +enum TMCloseReason { + crRESOURCE = 1; + crINVALID_CLOSED_LEDGER = 2; + crINVALID_PREV_LEDGER = 3; + crBAD_LEDGER_HEADERS = 4; + crLARGE_SEND_QUEUE = 5; + crNOT_USEFUL = 6; + crPING_TIMEOUT = 7; + crINTERNAL = 8; + crSHUTDOWN = 9; +} + +message TMClose { + required TMCloseReason reason = 1; +} \ No newline at end of file diff --git a/src/xrpld/overlay/detail/PeerImp.cpp b/src/xrpld/overlay/detail/PeerImp.cpp index 69f25e1eb4..e059cf46db 100644 --- a/src/xrpld/overlay/detail/PeerImp.cpp +++ b/src/xrpld/overlay/detail/PeerImp.cpp @@ -64,6 +64,34 @@ std::chrono::seconds constexpr peerTimerInterval{60}; // TODO: Remove this exclusion once unit tests are added after the hotfix // release. +std::string +to_string(protocol::TMCloseReason reason) +{ + switch (reason) + { + case protocol::crRESOURCE: + return "Too Many P2P Messages"; + case protocol::crINVALID_CLOSED_LEDGER: + return "Invalid Closed Ledger Header"; + case protocol::crINVALID_PREV_LEDGER: + return "Invalid Previous Ledger Header"; + case protocol::crBAD_LEDGER_HEADERS: + return "Bad Ledger Headers"; + case protocol::crLARGE_SEND_QUEUE: + return "Large Send Queue"; + case protocol::crNOT_USEFUL: + return "Peer Not Useful"; + case protocol::crPING_TIMEOUT: + return "Ping Timeout"; + case protocol::crINTERNAL: + return "Internal Error"; + case protocol::crSHUTDOWN: + return "Shutdown"; + } + + return "unknown"; +} + PeerImp::PeerImp( Application& app, id_t id, @@ -178,7 +206,7 @@ PeerImp::run() closed = parseLedgerHash(iter->value()); if (!closed) - fail("Malformed handshake data (1)"); + fail(protocol::TMCloseReason::crINVALID_CLOSED_LEDGER); } if (auto const iter = headers_.find("Previous-Ledger"); @@ -187,11 +215,11 @@ PeerImp::run() previous = parseLedgerHash(iter->value()); if (!previous) - fail("Malformed handshake data (2)"); + fail(protocol::TMCloseReason::crINVALID_PREV_LEDGER); } if (previous && !closed) - fail("Malformed handshake data (3)"); + fail(protocol::TMCloseReason::crBAD_LEDGER_HEADERS); { std::lock_guard sl(recentLock_); @@ -231,7 +259,8 @@ PeerImp::stop() JLOG(journal_.info()) << "Stop"; } } - close(); + + sendAndClose(protocol::TMCloseReason::crSHUTDOWN); } //------------------------------------------------------------------------------ @@ -241,10 +270,6 @@ PeerImp::send(std::shared_ptr const& m) { if (!strand_.running_in_this_thread()) return post(strand_, std::bind(&PeerImp::send, shared_from_this(), m)); - if (gracefulClose_) - return; - if (detaching_) - return; auto validator = m->getValidatorKey(); if (validator && !squelch_.expireSquelch(*validator)) @@ -356,7 +381,7 @@ PeerImp::charge(Resource::Charge const& fee, std::string const& context) { // Sever the connection overlay_.incPeerDisconnectCharges(); - fail("charge: Resources"); + fail(protocol::TMCloseReason::crRESOURCE); } } @@ -580,7 +605,6 @@ PeerImp::close() "ripple::PeerImp::close : strand in this thread"); if (socket_.is_open()) { - detaching_ = true; // DEPRECATED try { timer_.cancel(); @@ -604,22 +628,25 @@ PeerImp::close() } void -PeerImp::fail(std::string const& reason) +PeerImp::fail(protocol::TMCloseReason reason) { if (!strand_.running_in_this_thread()) return post( strand_, std::bind( - (void(Peer::*)(std::string const&)) & PeerImp::fail, + (void(Peer::*)(protocol::TMCloseReason)) & PeerImp::fail, shared_from_this(), reason)); - if (journal_.active(beast::severities::kWarning) && socket_.is_open()) + + if (journal_.active(beast::severities::kWarning) && socket_.is_open() && + reason != protocol::TMCloseReason::crINTERNAL) { std::string const n = name(); JLOG(journal_.warn()) << (n.empty() ? remote_address_.to_string() : n) - << " failed: " << reason; + << " failed: " << to_string(reason); } - close(); + + sendAndClose(reason); } void @@ -634,28 +661,28 @@ PeerImp::fail(std::string const& name, error_code ec) << name << " from " << toBase58(TokenType::NodePublic, publicKey_) << " at " << remote_address_.to_string() << ": " << ec.message(); } - close(); + + sendAndClose(protocol::TMCloseReason::crINTERNAL); } void -PeerImp::gracefulClose() +PeerImp::sendAndClose(protocol::TMCloseReason reason) { - XRPL_ASSERT( - strand_.running_in_this_thread(), - "ripple::PeerImp::gracefulClose : strand in this thread"); - XRPL_ASSERT( - socket_.is_open(), "ripple::PeerImp::gracefulClose : socket is open"); - XRPL_ASSERT( - !gracefulClose_, - "ripple::PeerImp::gracefulClose : socket is not closing"); - gracefulClose_ = true; - if (send_queue_.size() > 0) + if (shutdown_) return; - setTimer(); - stream_.async_shutdown(bind_executor( - strand_, - std::bind( - &PeerImp::onShutdown, shared_from_this(), std::placeholders::_1))); + + // erase all outstanding messages except for the one + // currently being executed + if (send_queue_.size() > 1) + { + decltype(send_queue_) q({send_queue_.front()}); + send_queue_.swap(q); + } + + shutdown_ = true; + protocol::TMClose tmGC; + tmGC.set_reason(reason); + send(std::make_shared(tmGC, protocol::mtCLOSE)); } void @@ -713,14 +740,11 @@ PeerImp::onTimer(error_code const& ec) { // This should never happen JLOG(journal_.error()) << "onTimer: " << ec.message(); - return close(); + return fail(protocol::TMCloseReason::crINTERNAL); } if (large_sendq_++ >= Tuning::sendqIntervals) - { - fail("Large send queue"); - return; - } + return fail(protocol::TMCloseReason::crLARGE_SEND_QUEUE); if (auto const t = tracking_.load(); !inbound_ && t != Tracking::converged) { @@ -737,17 +761,13 @@ PeerImp::onTimer(error_code const& ec) (duration > app_.config().MAX_UNKNOWN_TIME))) { overlay_.peerFinder().on_failure(slot_); - fail("Not useful"); - return; + return fail(protocol::TMCloseReason::crNOT_USEFUL); } } // Already waiting for PONG if (lastPingSeq_) - { - fail("Ping Timeout"); - return; - } + return fail(protocol::TMCloseReason::crPING_TIMEOUT); lastPingTime_ = clock_type::now(); lastPingSeq_ = rand_int(); @@ -761,21 +781,6 @@ PeerImp::onTimer(error_code const& ec) setTimer(); } -void -PeerImp::onShutdown(error_code ec) -{ - cancelTimer(); - // If we don't get eof then something went wrong - if (!ec) - { - JLOG(journal_.error()) << "onShutdown: expected error condition"; - return close(); - } - if (ec != boost::asio::error::eof) - return fail("onShutdown", ec); - close(); -} - //------------------------------------------------------------------------------ void PeerImp::doAccept() @@ -791,7 +796,10 @@ PeerImp::doAccept() // This shouldn't fail since we already computed // the shared value successfully in OverlayImpl if (!sharedValue) - return fail("makeSharedValue: Unexpected failure"); + { + JLOG(journal_.error()) << "doAccept: makeSharedValue failed"; + return fail(protocol::TMCloseReason::crINTERNAL); + } JLOG(journal_.info()) << "Protocol: " << to_string(protocol_); JLOG(journal_.info()) << "Public Key: " @@ -841,7 +849,9 @@ PeerImp::doAccept() return fail("onWriteResponse", ec); if (write_buffer->size() == bytes_transferred) return doProtocolStart(); - return fail("Failed to write header"); + + JLOG(journal_.error()) << "Failed to write header"; + return fail(protocol::TMCloseReason::crINTERNAL); })); } @@ -905,15 +915,19 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred) { if (!socket_.is_open()) return; - if (ec == boost::asio::error::operation_aborted) + + // we started closing the local connection, stop reading + if (ec == boost::asio::error::operation_aborted || shutdown_) return; if (ec == boost::asio::error::eof) { + // Peer initiated connection close, just clean up JLOG(journal_.info()) << "EOF"; - return gracefulClose(); + return close(); } if (ec) return fail("onReadMessage", ec); + if (auto stream = journal_.trace()) { if (bytes_transferred > 0) @@ -945,8 +959,6 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred) return fail("onReadMessage", ec); if (!socket_.is_open()) return; - if (gracefulClose_) - return; if (bytes_consumed == 0) break; read_buffer_.consume(bytes_consumed); @@ -969,6 +981,7 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred) { if (!socket_.is_open()) return; + if (ec == boost::asio::error::operation_aborted) return; if (ec) @@ -1002,16 +1015,9 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred) std::placeholders::_1, std::placeholders::_2))); } - - if (gracefulClose_) - { - return stream_.async_shutdown(bind_executor( - strand_, - std::bind( - &PeerImp::onShutdown, - shared_from_this(), - std::placeholders::_1))); - } + // If the send queue is empty and we are shutting down, close the connection + else if (shutdown_) + close(); } //------------------------------------------------------------------------------ @@ -2746,6 +2752,20 @@ PeerImp::onMessage(std::shared_ptr const& m) << "onMessage: TMSquelch " << slice << " " << id() << " " << duration; } +void +PeerImp::onMessage(std::shared_ptr const& m) +{ + if (m->has_reason()) + { + JLOG(p_journal_.debug()) + << "onMessage: TMClose: peer closed the connection: " + << to_string(m->reason()); + } + + // do not send a close message when the peer initiates the shutdown + close(); +} + //-------------------------------------------------------------------------- void diff --git a/src/xrpld/overlay/detail/PeerImp.h b/src/xrpld/overlay/detail/PeerImp.h index 3d9a0c0b1e..88aae1ea7b 100644 --- a/src/xrpld/overlay/detail/PeerImp.h +++ b/src/xrpld/overlay/detail/PeerImp.h @@ -95,7 +95,7 @@ private: std::atomic tracking_; clock_type::time_point trackingTime_; - bool detaching_ = false; + bool shutdown_ = false; // Node public key of peer. PublicKey const publicKey_; std::string name_; @@ -175,7 +175,6 @@ private: http_response_type response_; boost::beast::http::fields const& headers_; std::queue> send_queue_; - bool gracefulClose_ = false; int large_sendq_ = 0; std::unique_ptr load_event_; // The highest sequence of each PublisherList that has @@ -426,7 +425,7 @@ public: isHighLatency() const override; void - fail(std::string const& reason); + fail(protocol::TMCloseReason reason); bool compressionEnabled() const override @@ -445,10 +444,10 @@ private: close(); void - fail(std::string const& name, error_code ec); + sendAndClose(protocol::TMCloseReason reason); void - gracefulClose(); + fail(std::string const& name, error_code ec); void setTimer(); @@ -463,10 +462,6 @@ private: void onTimer(boost::system::error_code const& ec); - // Called when SSL shutdown completes - void - onShutdown(error_code ec); - void doAccept(); @@ -584,6 +579,8 @@ public: onMessage(std::shared_ptr const& m); void onMessage(std::shared_ptr const& m); + void + onMessage(std::shared_ptr const& m); private: //-------------------------------------------------------------------------- diff --git a/src/xrpld/overlay/detail/ProtocolMessage.h b/src/xrpld/overlay/detail/ProtocolMessage.h index 26c83055a3..16de06f2a3 100644 --- a/src/xrpld/overlay/detail/ProtocolMessage.h +++ b/src/xrpld/overlay/detail/ProtocolMessage.h @@ -104,8 +104,8 @@ protocolMessageName(int type) return "replay_delta_request"; case protocol::mtREPLAY_DELTA_RESPONSE: return "replay_delta_response"; - default: - break; + case protocol::mtCLOSE: + return "close"; } return "unknown"; } @@ -470,6 +470,10 @@ invokeProtocolMessage( success = detail::invoke( *header, buffers, handler); break; + case protocol::mtCLOSE: + success = + detail::invoke(*header, buffers, handler); + break; default: handler.onMessageUnknown(header->message_type); success = true; diff --git a/src/xrpld/overlay/detail/TrafficCount.cpp b/src/xrpld/overlay/detail/TrafficCount.cpp index a2234a432e..ecd67bc93c 100644 --- a/src/xrpld/overlay/detail/TrafficCount.cpp +++ b/src/xrpld/overlay/detail/TrafficCount.cpp @@ -46,7 +46,7 @@ std::unordered_map const {protocol::mtTRANSACTIONS, TrafficCount::category::requested_transactions}, {protocol::mtSQUELCH, TrafficCount::category::squelch}, -}; + {protocol::mtCLOSE, TrafficCount::category::close}}; TrafficCount::category TrafficCount::categorize( diff --git a/src/xrpld/overlay/detail/TrafficCount.h b/src/xrpld/overlay/detail/TrafficCount.h index 8dc02def5f..647abf576b 100644 --- a/src/xrpld/overlay/detail/TrafficCount.h +++ b/src/xrpld/overlay/detail/TrafficCount.h @@ -195,7 +195,7 @@ public: // The total p2p bytes sent and received on the wire total, - + close, unknown // must be last }; @@ -304,6 +304,7 @@ public: {replay_delta_response, "replay_delta_response"}, {have_transactions, "have_transactions"}, {requested_transactions, "requested_transactions"}, + {close, "close"}, {total, "total"}}; if (auto it = category_map.find(cat); it != category_map.end()) @@ -370,6 +371,7 @@ protected: {have_transactions, {have_transactions}}, {requested_transactions, {requested_transactions}}, {total, {total}}, + {close, {close}}, {unknown, {unknown}}, }; };