address review comments

This commit is contained in:
Vito
2025-08-26 14:21:27 +02:00
parent a34d2139a7
commit df82fe3311
4 changed files with 45 additions and 48 deletions

View File

@@ -62,7 +62,6 @@ ConnectAttempt::~ConnectAttempt()
{
if (slot_ != nullptr)
overlay_.peerFinder().on_closed(slot_);
JLOG(journal_.trace()) << "~ConnectAttempt";
}
void
@@ -75,7 +74,7 @@ ConnectAttempt::stop()
if (!socket_.is_open())
return;
JLOG(journal_.debug()) << "Stop";
JLOG(journal_.debug()) << "stop: Stop";
shutdown();
}
@@ -83,7 +82,7 @@ ConnectAttempt::stop()
void
ConnectAttempt::run()
{
isIOInProgress_ = true;
ioPending_ = true;
stream_.next_layer().async_connect(
remote_endpoint_,
@@ -122,15 +121,11 @@ ConnectAttempt::tryAsyncShutdown()
if (!shutdown_ || shutdownStarted_)
return;
if (isIOInProgress_)
if (ioPending_)
return;
shutdownStarted_ = true;
XRPL_ASSERT(
!isIOInProgress_,
"ripple::ConnectAttempt::tryAsyncShutdown: io not in progress");
setTimer();
// gracefully shutdown the SSL socket, performing a shutdown handshake
@@ -178,7 +173,7 @@ ConnectAttempt::close()
error_code ec;
socket_.close(ec);
JLOG(journal_.debug()) << "close: Closed";
JLOG(journal_.info()) << "close: Closed";
}
void
@@ -205,7 +200,7 @@ ConnectAttempt::setTimer()
catch (std::exception const& ex)
{
JLOG(journal_.error()) << "setTimer: " << ex.what();
return;
return close();
};
timer_.async_wait(strand_.wrap(std::bind(
@@ -227,8 +222,9 @@ ConnectAttempt::onTimer(error_code ec)
if (ec)
{
// do not initiate shutdown, timers are frequently cancelled
if (ec == boost::asio::error::operation_aborted)
return tryAsyncShutdown();
return;
// This should never happen
JLOG(journal_.error()) << "onTimer: " << ec.message();
@@ -243,7 +239,7 @@ ConnectAttempt::onConnect(error_code ec)
{
cancelTimer();
isIOInProgress_ = false;
ioPending_ = false;
if (ec)
{
@@ -261,14 +257,12 @@ ConnectAttempt::onConnect(error_code ec)
if (ec)
return fail("onConnect", ec);
JLOG(journal_.trace()) << "onConnect";
if (shutdown_)
return tryAsyncShutdown();
setTimer();
isIOInProgress_ = true;
ioPending_ = true;
stream_.set_verify_mode(boost::asio::ssl::verify_none);
stream_.async_handshake(
@@ -284,7 +278,7 @@ ConnectAttempt::onHandshake(error_code ec)
{
cancelTimer();
isIOInProgress_ = false;
ioPending_ = false;
if (ec)
{
@@ -327,7 +321,7 @@ ConnectAttempt::onHandshake(error_code ec)
setTimer();
isIOInProgress_ = true;
ioPending_ = true;
boost::beast::http::async_write(
stream_,
@@ -343,7 +337,7 @@ ConnectAttempt::onWrite(error_code ec)
{
cancelTimer();
isIOInProgress_ = false;
ioPending_ = false;
if (ec)
{
@@ -357,7 +351,7 @@ ConnectAttempt::onWrite(error_code ec)
return tryAsyncShutdown();
setTimer();
isIOInProgress_ = true;
ioPending_ = true;
boost::beast::http::async_read(
stream_,
@@ -374,7 +368,7 @@ ConnectAttempt::onRead(error_code ec)
{
cancelTimer();
isIOInProgress_ = false;
ioPending_ = false;
if (ec)
{
@@ -474,11 +468,10 @@ ConnectAttempt::processResponse()
remote_endpoint_.address(),
app_);
JLOG(journal_.info())
<< "Public Key: " << toBase58(TokenType::NodePublic, publicKey);
JLOG(journal_.debug())
<< "Protocol: " << to_string(*negotiatedProtocol);
JLOG(journal_.info())
<< "Public Key: " << toBase58(TokenType::NodePublic, publicKey);
auto const member = app_.cluster().member(publicKey);
if (member)
@@ -486,8 +479,8 @@ ConnectAttempt::processResponse()
JLOG(journal_.info()) << "Cluster name: " << *member;
}
auto const result = overlay_.peerFinder().activate(
slot_, publicKey, static_cast<bool>(member));
auto const result =
overlay_.peerFinder().activate(slot_, publicKey, !member->empty());
if (result != PeerFinder::Result::success)
{
std::stringstream ss;

View File

@@ -60,7 +60,7 @@ private:
std::shared_ptr<PeerFinder::Slot> slot_;
request_type req_;
bool shutdown_ = false;
bool isIOInProgress_ = false;
bool ioPending_ = false;
bool shutdownStarted_ = false;
public:

View File

@@ -44,6 +44,7 @@
#include <boost/beast/core/ostream.hpp>
#include <algorithm>
#include <chrono>
#include <memory>
#include <mutex>
#include <numeric>
@@ -59,6 +60,10 @@ std::chrono::milliseconds constexpr peerHighLatency{300};
/** How often we PING the peer to check for latency and sendq probe */
std::chrono::seconds constexpr peerTimerInterval{60};
/** The timeout for a shutdown timer */
std::chrono::seconds constexpr shutdownTimerInterval{5};
} // namespace
// TODO: Remove this exclusion once unit tests are added after the hotfix
@@ -223,7 +228,7 @@ PeerImp::stop()
// outbound connections are under our control and may be logged
// at a higher level, but inbound connections are more numerous and
// uncontrolled so to prevent log flooding the severity is reduced.
JLOG((inbound_ ? journal_.debug() : journal_.info())) << "stop: Stop";
JLOG(journal_.debug()) << "stop: Stop";
shutdown();
}
@@ -622,16 +627,12 @@ PeerImp::tryAsyncShutdown()
if (!shutdown_ || shutdownStarted_)
return;
if (readInProgress_ || writeInProgress_)
if (readPending_ || writePending_)
return;
shutdownStarted_ = true;
XRPL_ASSERT(
!readInProgress_ && !writeInProgress_,
"ripple::PeerImp::tryAsyncShutdown : read and write not in progress");
setTimer();
setTimer(shutdownTimerInterval);
// gracefully shutdown the SSL socket, performing a shutdown handshake
stream_.async_shutdown(bind_executor(
@@ -705,11 +706,11 @@ PeerImp::close()
//------------------------------------------------------------------------------
void
PeerImp::setTimer()
PeerImp::setTimer(std::chrono::seconds interval)
{
try
{
timer_.expires_after(peerTimerInterval);
timer_.expires_after(interval);
}
catch (std::exception const& ex)
{
@@ -731,6 +732,7 @@ PeerImp::onTimer(error_code const& ec)
if (ec)
{
// do not initiate shutdown, timers are frequently cancelled
if (ec == boost::asio::error::operation_aborted)
return;
@@ -743,7 +745,7 @@ PeerImp::onTimer(error_code const& ec)
// force close the connection
if (shutdown_)
{
JLOG(journal_.warn()) << "onTimer: shutdown timer expired";
JLOG(journal_.debug()) << "onTimer: shutdown timer expired";
return close();
}
@@ -782,7 +784,7 @@ PeerImp::onTimer(error_code const& ec)
send(std::make_shared<Message>(message, protocol::mtPING));
setTimer();
setTimer(peerTimerInterval);
}
void
@@ -815,7 +817,8 @@ PeerImp::doAccept()
"ripple::PeerImp::doAccept : empty read buffer");
JLOG(journal_.debug()) << "doAccept: " << remote_address_;
// a shutdown was initiated before the handshare, there is nothing to do
// a shutdown was initiated before the handshake, there is nothing to do
if (shutdown_)
return tryAsyncShutdown();
@@ -826,7 +829,7 @@ PeerImp::doAccept()
if (!sharedValue)
return fail("makeSharedValue: Unexpected failure");
JLOG(journal_.info()) << "Protocol: " << to_string(protocol_);
JLOG(journal_.debug()) << "Protocol: " << to_string(protocol_);
JLOG(journal_.info()) << "Public Key: "
<< toBase58(TokenType::NodePublic, publicKey_);
@@ -930,7 +933,7 @@ PeerImp::doProtocolStart()
if (auto m = overlay_.getManifestsMessage())
send(m);
setTimer();
setTimer(peerTimerInterval);
}
// Called repeatedly with protocol message data
@@ -941,7 +944,7 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
strand_.running_in_this_thread(),
"ripple::PeerImp::onReadMessage : strand in this thread");
readInProgress_ = false;
readPending_ = false;
if (!socket_.is_open())
return;
@@ -950,7 +953,7 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
{
if (ec == boost::asio::error::eof)
{
JLOG(journal_.info()) << "EOF";
JLOG(journal_.debug()) << "EOF";
return shutdown();
}
@@ -1004,7 +1007,7 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
if (shutdown_)
return tryAsyncShutdown();
readInProgress_ = true;
readPending_ = true;
XRPL_ASSERT(
!shutdownStarted_, "ripple::PeerImp::onReadMessage : shutdown started");
@@ -1028,7 +1031,7 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
strand_.running_in_this_thread(),
"ripple::PeerImp::onWriteMessage : strand in this thread");
writeInProgress_ = false;
writePending_ = false;
if (!socket_.is_open())
return;
@@ -1061,7 +1064,7 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
if (!send_queue_.empty())
{
writeInProgress_ = true;
writePending_ = true;
XRPL_ASSERT(
!shutdownStarted_,
"ripple::PeerImp::onWriteMessage : shutdown started");

View File

@@ -41,6 +41,7 @@
#include <boost/thread/shared_mutex.hpp>
#include <atomic>
#include <chrono>
#include <cstdint>
#include <optional>
#include <queue>
@@ -177,8 +178,8 @@ private:
std::queue<std::shared_ptr<Message>> send_queue_;
bool shutdown_ = false;
bool shutdownStarted_ = false;
bool readInProgress_ = false;
bool writeInProgress_ = false;
bool readPending_ = false;
bool writePending_ = false;
int large_sendq_ = 0;
std::unique_ptr<LoadEvent> load_event_;
// The highest sequence of each PublisherList that has
@@ -537,7 +538,7 @@ private:
* @note This function will terminate the connection in case of any errors.
*/
void
setTimer();
setTimer(std::chrono::seconds interval);
/**
* @brief Handles the expiration of the peer activity timer.