From df82fe331191b0df7c2ca9ac9add246458cdf917 Mon Sep 17 00:00:00 2001 From: Vito <5780819+Tapanito@users.noreply.github.com> Date: Tue, 26 Aug 2025 14:21:27 +0200 Subject: [PATCH] address review comments --- src/xrpld/overlay/detail/ConnectAttempt.cpp | 43 +++++++++------------ src/xrpld/overlay/detail/ConnectAttempt.h | 2 +- src/xrpld/overlay/detail/PeerImp.cpp | 41 +++++++++++--------- src/xrpld/overlay/detail/PeerImp.h | 7 ++-- 4 files changed, 45 insertions(+), 48 deletions(-) diff --git a/src/xrpld/overlay/detail/ConnectAttempt.cpp b/src/xrpld/overlay/detail/ConnectAttempt.cpp index 4f8b7da06a..bd0ec8b92f 100644 --- a/src/xrpld/overlay/detail/ConnectAttempt.cpp +++ b/src/xrpld/overlay/detail/ConnectAttempt.cpp @@ -62,7 +62,6 @@ ConnectAttempt::~ConnectAttempt() { if (slot_ != nullptr) overlay_.peerFinder().on_closed(slot_); - JLOG(journal_.trace()) << "~ConnectAttempt"; } void @@ -75,7 +74,7 @@ ConnectAttempt::stop() if (!socket_.is_open()) return; - JLOG(journal_.debug()) << "Stop"; + JLOG(journal_.debug()) << "stop: Stop"; shutdown(); } @@ -83,7 +82,7 @@ ConnectAttempt::stop() void ConnectAttempt::run() { - isIOInProgress_ = true; + ioPending_ = true; stream_.next_layer().async_connect( remote_endpoint_, @@ -122,15 +121,11 @@ ConnectAttempt::tryAsyncShutdown() if (!shutdown_ || shutdownStarted_) return; - if (isIOInProgress_) + if (ioPending_) return; shutdownStarted_ = true; - XRPL_ASSERT( - !isIOInProgress_, - "ripple::ConnectAttempt::tryAsyncShutdown: io not in progress"); - setTimer(); // gracefully shutdown the SSL socket, performing a shutdown handshake @@ -178,7 +173,7 @@ ConnectAttempt::close() error_code ec; socket_.close(ec); - JLOG(journal_.debug()) << "close: Closed"; + JLOG(journal_.info()) << "close: Closed"; } void @@ -205,7 +200,7 @@ ConnectAttempt::setTimer() catch (std::exception const& ex) { JLOG(journal_.error()) << "setTimer: " << ex.what(); - return; + return close(); }; timer_.async_wait(strand_.wrap(std::bind( @@ -227,8 +222,9 @@ ConnectAttempt::onTimer(error_code ec) if (ec) { + // do not initiate shutdown, timers are frequently cancelled if (ec == boost::asio::error::operation_aborted) - return tryAsyncShutdown(); + return; // This should never happen JLOG(journal_.error()) << "onTimer: " << ec.message(); @@ -243,7 +239,7 @@ ConnectAttempt::onConnect(error_code ec) { cancelTimer(); - isIOInProgress_ = false; + ioPending_ = false; if (ec) { @@ -261,14 +257,12 @@ ConnectAttempt::onConnect(error_code ec) if (ec) return fail("onConnect", ec); - JLOG(journal_.trace()) << "onConnect"; - if (shutdown_) return tryAsyncShutdown(); setTimer(); - isIOInProgress_ = true; + ioPending_ = true; stream_.set_verify_mode(boost::asio::ssl::verify_none); stream_.async_handshake( @@ -284,7 +278,7 @@ ConnectAttempt::onHandshake(error_code ec) { cancelTimer(); - isIOInProgress_ = false; + ioPending_ = false; if (ec) { @@ -327,7 +321,7 @@ ConnectAttempt::onHandshake(error_code ec) setTimer(); - isIOInProgress_ = true; + ioPending_ = true; boost::beast::http::async_write( stream_, @@ -343,7 +337,7 @@ ConnectAttempt::onWrite(error_code ec) { cancelTimer(); - isIOInProgress_ = false; + ioPending_ = false; if (ec) { @@ -357,7 +351,7 @@ ConnectAttempt::onWrite(error_code ec) return tryAsyncShutdown(); setTimer(); - isIOInProgress_ = true; + ioPending_ = true; boost::beast::http::async_read( stream_, @@ -374,7 +368,7 @@ ConnectAttempt::onRead(error_code ec) { cancelTimer(); - isIOInProgress_ = false; + ioPending_ = false; if (ec) { @@ -474,11 +468,10 @@ ConnectAttempt::processResponse() remote_endpoint_.address(), app_); - JLOG(journal_.info()) - << "Public Key: " << toBase58(TokenType::NodePublic, publicKey); - JLOG(journal_.debug()) << "Protocol: " << to_string(*negotiatedProtocol); + JLOG(journal_.info()) + << "Public Key: " << toBase58(TokenType::NodePublic, publicKey); auto const member = app_.cluster().member(publicKey); if (member) @@ -486,8 +479,8 @@ ConnectAttempt::processResponse() JLOG(journal_.info()) << "Cluster name: " << *member; } - auto const result = overlay_.peerFinder().activate( - slot_, publicKey, static_cast(member)); + auto const result = + overlay_.peerFinder().activate(slot_, publicKey, !member->empty()); if (result != PeerFinder::Result::success) { std::stringstream ss; diff --git a/src/xrpld/overlay/detail/ConnectAttempt.h b/src/xrpld/overlay/detail/ConnectAttempt.h index 6661d56cee..8b4cdc05d6 100644 --- a/src/xrpld/overlay/detail/ConnectAttempt.h +++ b/src/xrpld/overlay/detail/ConnectAttempt.h @@ -60,7 +60,7 @@ private: std::shared_ptr slot_; request_type req_; bool shutdown_ = false; - bool isIOInProgress_ = false; + bool ioPending_ = false; bool shutdownStarted_ = false; public: diff --git a/src/xrpld/overlay/detail/PeerImp.cpp b/src/xrpld/overlay/detail/PeerImp.cpp index 9ef2d81cb2..56ddb69c78 100644 --- a/src/xrpld/overlay/detail/PeerImp.cpp +++ b/src/xrpld/overlay/detail/PeerImp.cpp @@ -44,6 +44,7 @@ #include #include +#include #include #include #include @@ -59,6 +60,10 @@ std::chrono::milliseconds constexpr peerHighLatency{300}; /** How often we PING the peer to check for latency and sendq probe */ std::chrono::seconds constexpr peerTimerInterval{60}; + +/** The timeout for a shutdown timer */ +std::chrono::seconds constexpr shutdownTimerInterval{5}; + } // namespace // TODO: Remove this exclusion once unit tests are added after the hotfix @@ -223,7 +228,7 @@ PeerImp::stop() // outbound connections are under our control and may be logged // at a higher level, but inbound connections are more numerous and // uncontrolled so to prevent log flooding the severity is reduced. - JLOG((inbound_ ? journal_.debug() : journal_.info())) << "stop: Stop"; + JLOG(journal_.debug()) << "stop: Stop"; shutdown(); } @@ -622,16 +627,12 @@ PeerImp::tryAsyncShutdown() if (!shutdown_ || shutdownStarted_) return; - if (readInProgress_ || writeInProgress_) + if (readPending_ || writePending_) return; shutdownStarted_ = true; - XRPL_ASSERT( - !readInProgress_ && !writeInProgress_, - "ripple::PeerImp::tryAsyncShutdown : read and write not in progress"); - - setTimer(); + setTimer(shutdownTimerInterval); // gracefully shutdown the SSL socket, performing a shutdown handshake stream_.async_shutdown(bind_executor( @@ -705,11 +706,11 @@ PeerImp::close() //------------------------------------------------------------------------------ void -PeerImp::setTimer() +PeerImp::setTimer(std::chrono::seconds interval) { try { - timer_.expires_after(peerTimerInterval); + timer_.expires_after(interval); } catch (std::exception const& ex) { @@ -731,6 +732,7 @@ PeerImp::onTimer(error_code const& ec) if (ec) { + // do not initiate shutdown, timers are frequently cancelled if (ec == boost::asio::error::operation_aborted) return; @@ -743,7 +745,7 @@ PeerImp::onTimer(error_code const& ec) // force close the connection if (shutdown_) { - JLOG(journal_.warn()) << "onTimer: shutdown timer expired"; + JLOG(journal_.debug()) << "onTimer: shutdown timer expired"; return close(); } @@ -782,7 +784,7 @@ PeerImp::onTimer(error_code const& ec) send(std::make_shared(message, protocol::mtPING)); - setTimer(); + setTimer(peerTimerInterval); } void @@ -815,7 +817,8 @@ PeerImp::doAccept() "ripple::PeerImp::doAccept : empty read buffer"); JLOG(journal_.debug()) << "doAccept: " << remote_address_; - // a shutdown was initiated before the handshare, there is nothing to do + + // a shutdown was initiated before the handshake, there is nothing to do if (shutdown_) return tryAsyncShutdown(); @@ -826,7 +829,7 @@ PeerImp::doAccept() if (!sharedValue) return fail("makeSharedValue: Unexpected failure"); - JLOG(journal_.info()) << "Protocol: " << to_string(protocol_); + JLOG(journal_.debug()) << "Protocol: " << to_string(protocol_); JLOG(journal_.info()) << "Public Key: " << toBase58(TokenType::NodePublic, publicKey_); @@ -930,7 +933,7 @@ PeerImp::doProtocolStart() if (auto m = overlay_.getManifestsMessage()) send(m); - setTimer(); + setTimer(peerTimerInterval); } // Called repeatedly with protocol message data @@ -941,7 +944,7 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred) strand_.running_in_this_thread(), "ripple::PeerImp::onReadMessage : strand in this thread"); - readInProgress_ = false; + readPending_ = false; if (!socket_.is_open()) return; @@ -950,7 +953,7 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred) { if (ec == boost::asio::error::eof) { - JLOG(journal_.info()) << "EOF"; + JLOG(journal_.debug()) << "EOF"; return shutdown(); } @@ -1004,7 +1007,7 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred) if (shutdown_) return tryAsyncShutdown(); - readInProgress_ = true; + readPending_ = true; XRPL_ASSERT( !shutdownStarted_, "ripple::PeerImp::onReadMessage : shutdown started"); @@ -1028,7 +1031,7 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred) strand_.running_in_this_thread(), "ripple::PeerImp::onWriteMessage : strand in this thread"); - writeInProgress_ = false; + writePending_ = false; if (!socket_.is_open()) return; @@ -1061,7 +1064,7 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred) if (!send_queue_.empty()) { - writeInProgress_ = true; + writePending_ = true; XRPL_ASSERT( !shutdownStarted_, "ripple::PeerImp::onWriteMessage : shutdown started"); diff --git a/src/xrpld/overlay/detail/PeerImp.h b/src/xrpld/overlay/detail/PeerImp.h index a45fc99079..9725d4371b 100644 --- a/src/xrpld/overlay/detail/PeerImp.h +++ b/src/xrpld/overlay/detail/PeerImp.h @@ -41,6 +41,7 @@ #include #include +#include #include #include #include @@ -177,8 +178,8 @@ private: std::queue> send_queue_; bool shutdown_ = false; bool shutdownStarted_ = false; - bool readInProgress_ = false; - bool writeInProgress_ = false; + bool readPending_ = false; + bool writePending_ = false; int large_sendq_ = 0; std::unique_ptr load_event_; // The highest sequence of each PublisherList that has @@ -537,7 +538,7 @@ private: * @note This function will terminate the connection in case of any errors. */ void - setTimer(); + setTimer(std::chrono::seconds interval); /** * @brief Handles the expiration of the peer activity timer.