Compare commits

...

2 Commits

Author SHA1 Message Date
Vito
a15abd4067 attempt to appease formatting gods 2025-09-02 15:08:57 +02:00
Vito
d9695be838 refactor(overlay): Overhaul peer disconnection logic
This commit refactors the peer shutdown and failure handling
mechanism to be more robust, consistent, and communicative.

The previous implementation used raw strings to represent error reasons
and did not communicate these reasons to peers when shutting down a connection.

With this change disconnections are now explicitly communicated via a
`TMClose` protocol message with strongly-typed reasons. This new
approach provides better diagnostics and makes the peer disconnection
process more stable and predictable.
2025-09-02 15:00:12 +02:00
6 changed files with 127 additions and 87 deletions

View File

@@ -26,6 +26,7 @@ enum MessageType {
mtREPLAY_DELTA_RESPONSE = 60; mtREPLAY_DELTA_RESPONSE = 60;
mtHAVE_TRANSACTIONS = 63; mtHAVE_TRANSACTIONS = 63;
mtTRANSACTIONS = 64; mtTRANSACTIONS = 64;
mtCLOSE = 65;
} }
// token, iterations, target, challenge = issue demand for proof of work // token, iterations, target, challenge = issue demand for proof of work
@@ -341,3 +342,19 @@ message TMReplayDeltaResponse {
message TMHaveTransactions { message TMHaveTransactions {
repeated bytes hashes = 1; repeated bytes hashes = 1;
} }
enum TMCloseReason {
crRESOURCE = 1;
crINVALID_CLOSED_LEDGER = 2;
crINVALID_PREV_LEDGER = 3;
crBAD_LEDGER_HEADERS = 4;
crLARGE_SEND_QUEUE = 5;
crNOT_USEFUL = 6;
crPING_TIMEOUT = 7;
crINTERNAL = 8;
crSHUTDOWN = 9;
}
message TMClose {
required TMCloseReason reason = 1;
}

View File

@@ -64,6 +64,34 @@ std::chrono::seconds constexpr peerTimerInterval{60};
// TODO: Remove this exclusion once unit tests are added after the hotfix // TODO: Remove this exclusion once unit tests are added after the hotfix
// release. // release.
std::string
to_string(protocol::TMCloseReason reason)
{
switch (reason)
{
case protocol::crRESOURCE:
return "Too Many P2P Messages";
case protocol::crINVALID_CLOSED_LEDGER:
return "Invalid Closed Ledger Header";
case protocol::crINVALID_PREV_LEDGER:
return "Invalid Previous Ledger Header";
case protocol::crBAD_LEDGER_HEADERS:
return "Bad Ledger Headers";
case protocol::crLARGE_SEND_QUEUE:
return "Large Send Queue";
case protocol::crNOT_USEFUL:
return "Peer Not Useful";
case protocol::crPING_TIMEOUT:
return "Ping Timeout";
case protocol::crINTERNAL:
return "Internal Error";
case protocol::crSHUTDOWN:
return "Shutdown";
}
return "unknown";
}
PeerImp::PeerImp( PeerImp::PeerImp(
Application& app, Application& app,
id_t id, id_t id,
@@ -178,7 +206,7 @@ PeerImp::run()
closed = parseLedgerHash(iter->value()); closed = parseLedgerHash(iter->value());
if (!closed) if (!closed)
fail("Malformed handshake data (1)"); fail(protocol::TMCloseReason::crINVALID_CLOSED_LEDGER);
} }
if (auto const iter = headers_.find("Previous-Ledger"); if (auto const iter = headers_.find("Previous-Ledger");
@@ -187,11 +215,11 @@ PeerImp::run()
previous = parseLedgerHash(iter->value()); previous = parseLedgerHash(iter->value());
if (!previous) if (!previous)
fail("Malformed handshake data (2)"); fail(protocol::TMCloseReason::crINVALID_PREV_LEDGER);
} }
if (previous && !closed) if (previous && !closed)
fail("Malformed handshake data (3)"); fail(protocol::TMCloseReason::crBAD_LEDGER_HEADERS);
{ {
std::lock_guard<std::mutex> sl(recentLock_); std::lock_guard<std::mutex> sl(recentLock_);
@@ -231,7 +259,8 @@ PeerImp::stop()
JLOG(journal_.info()) << "Stop"; JLOG(journal_.info()) << "Stop";
} }
} }
close();
sendAndClose(protocol::TMCloseReason::crSHUTDOWN);
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@@ -241,10 +270,6 @@ PeerImp::send(std::shared_ptr<Message> const& m)
{ {
if (!strand_.running_in_this_thread()) if (!strand_.running_in_this_thread())
return post(strand_, std::bind(&PeerImp::send, shared_from_this(), m)); return post(strand_, std::bind(&PeerImp::send, shared_from_this(), m));
if (gracefulClose_)
return;
if (detaching_)
return;
auto validator = m->getValidatorKey(); auto validator = m->getValidatorKey();
if (validator && !squelch_.expireSquelch(*validator)) if (validator && !squelch_.expireSquelch(*validator))
@@ -356,7 +381,7 @@ PeerImp::charge(Resource::Charge const& fee, std::string const& context)
{ {
// Sever the connection // Sever the connection
overlay_.incPeerDisconnectCharges(); overlay_.incPeerDisconnectCharges();
fail("charge: Resources"); fail(protocol::TMCloseReason::crRESOURCE);
} }
} }
@@ -580,7 +605,6 @@ PeerImp::close()
"ripple::PeerImp::close : strand in this thread"); "ripple::PeerImp::close : strand in this thread");
if (socket_.is_open()) if (socket_.is_open())
{ {
detaching_ = true; // DEPRECATED
try try
{ {
timer_.cancel(); timer_.cancel();
@@ -604,22 +628,25 @@ PeerImp::close()
} }
void void
PeerImp::fail(std::string const& reason) PeerImp::fail(protocol::TMCloseReason reason)
{ {
if (!strand_.running_in_this_thread()) if (!strand_.running_in_this_thread())
return post( return post(
strand_, strand_,
std::bind( std::bind(
(void(Peer::*)(std::string const&)) & PeerImp::fail, (void(Peer::*)(protocol::TMCloseReason)) & PeerImp::fail,
shared_from_this(), shared_from_this(),
reason)); reason));
if (journal_.active(beast::severities::kWarning) && socket_.is_open())
if (journal_.active(beast::severities::kWarning) && socket_.is_open() &&
reason != protocol::TMCloseReason::crINTERNAL)
{ {
std::string const n = name(); std::string const n = name();
JLOG(journal_.warn()) << (n.empty() ? remote_address_.to_string() : n) JLOG(journal_.warn()) << (n.empty() ? remote_address_.to_string() : n)
<< " failed: " << reason; << " failed: " << to_string(reason);
} }
close();
sendAndClose(reason);
} }
void void
@@ -634,28 +661,28 @@ PeerImp::fail(std::string const& name, error_code ec)
<< name << " from " << toBase58(TokenType::NodePublic, publicKey_) << name << " from " << toBase58(TokenType::NodePublic, publicKey_)
<< " at " << remote_address_.to_string() << ": " << ec.message(); << " at " << remote_address_.to_string() << ": " << ec.message();
} }
close();
sendAndClose(protocol::TMCloseReason::crINTERNAL);
} }
void void
PeerImp::gracefulClose() PeerImp::sendAndClose(protocol::TMCloseReason reason)
{ {
XRPL_ASSERT( if (shutdown_)
strand_.running_in_this_thread(),
"ripple::PeerImp::gracefulClose : strand in this thread");
XRPL_ASSERT(
socket_.is_open(), "ripple::PeerImp::gracefulClose : socket is open");
XRPL_ASSERT(
!gracefulClose_,
"ripple::PeerImp::gracefulClose : socket is not closing");
gracefulClose_ = true;
if (send_queue_.size() > 0)
return; return;
setTimer();
stream_.async_shutdown(bind_executor( // erase all outstanding messages except for the one
strand_, // currently being executed
std::bind( if (send_queue_.size() > 1)
&PeerImp::onShutdown, shared_from_this(), std::placeholders::_1))); {
decltype(send_queue_) q({send_queue_.front()});
send_queue_.swap(q);
}
shutdown_ = true;
protocol::TMClose tmGC;
tmGC.set_reason(reason);
send(std::make_shared<Message>(tmGC, protocol::mtCLOSE));
} }
void void
@@ -713,14 +740,11 @@ PeerImp::onTimer(error_code const& ec)
{ {
// This should never happen // This should never happen
JLOG(journal_.error()) << "onTimer: " << ec.message(); JLOG(journal_.error()) << "onTimer: " << ec.message();
return close(); return fail(protocol::TMCloseReason::crINTERNAL);
} }
if (large_sendq_++ >= Tuning::sendqIntervals) if (large_sendq_++ >= Tuning::sendqIntervals)
{ return fail(protocol::TMCloseReason::crLARGE_SEND_QUEUE);
fail("Large send queue");
return;
}
if (auto const t = tracking_.load(); !inbound_ && t != Tracking::converged) if (auto const t = tracking_.load(); !inbound_ && t != Tracking::converged)
{ {
@@ -737,17 +761,13 @@ PeerImp::onTimer(error_code const& ec)
(duration > app_.config().MAX_UNKNOWN_TIME))) (duration > app_.config().MAX_UNKNOWN_TIME)))
{ {
overlay_.peerFinder().on_failure(slot_); overlay_.peerFinder().on_failure(slot_);
fail("Not useful"); return fail(protocol::TMCloseReason::crNOT_USEFUL);
return;
} }
} }
// Already waiting for PONG // Already waiting for PONG
if (lastPingSeq_) if (lastPingSeq_)
{ return fail(protocol::TMCloseReason::crPING_TIMEOUT);
fail("Ping Timeout");
return;
}
lastPingTime_ = clock_type::now(); lastPingTime_ = clock_type::now();
lastPingSeq_ = rand_int<std::uint32_t>(); lastPingSeq_ = rand_int<std::uint32_t>();
@@ -761,21 +781,6 @@ PeerImp::onTimer(error_code const& ec)
setTimer(); setTimer();
} }
void
PeerImp::onShutdown(error_code ec)
{
cancelTimer();
// If we don't get eof then something went wrong
if (!ec)
{
JLOG(journal_.error()) << "onShutdown: expected error condition";
return close();
}
if (ec != boost::asio::error::eof)
return fail("onShutdown", ec);
close();
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
void void
PeerImp::doAccept() PeerImp::doAccept()
@@ -791,7 +796,10 @@ PeerImp::doAccept()
// This shouldn't fail since we already computed // This shouldn't fail since we already computed
// the shared value successfully in OverlayImpl // the shared value successfully in OverlayImpl
if (!sharedValue) if (!sharedValue)
return fail("makeSharedValue: Unexpected failure"); {
JLOG(journal_.error()) << "doAccept: makeSharedValue failed";
return fail(protocol::TMCloseReason::crINTERNAL);
}
JLOG(journal_.info()) << "Protocol: " << to_string(protocol_); JLOG(journal_.info()) << "Protocol: " << to_string(protocol_);
JLOG(journal_.info()) << "Public Key: " JLOG(journal_.info()) << "Public Key: "
@@ -841,7 +849,9 @@ PeerImp::doAccept()
return fail("onWriteResponse", ec); return fail("onWriteResponse", ec);
if (write_buffer->size() == bytes_transferred) if (write_buffer->size() == bytes_transferred)
return doProtocolStart(); return doProtocolStart();
return fail("Failed to write header");
JLOG(journal_.error()) << "Failed to write header";
return fail(protocol::TMCloseReason::crINTERNAL);
})); }));
} }
@@ -905,15 +915,19 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
{ {
if (!socket_.is_open()) if (!socket_.is_open())
return; return;
if (ec == boost::asio::error::operation_aborted)
// we started closing the local connection, stop reading
if (ec == boost::asio::error::operation_aborted || shutdown_)
return; return;
if (ec == boost::asio::error::eof) if (ec == boost::asio::error::eof)
{ {
// Peer initiated connection close, just clean up
JLOG(journal_.info()) << "EOF"; JLOG(journal_.info()) << "EOF";
return gracefulClose(); return close();
} }
if (ec) if (ec)
return fail("onReadMessage", ec); return fail("onReadMessage", ec);
if (auto stream = journal_.trace()) if (auto stream = journal_.trace())
{ {
if (bytes_transferred > 0) if (bytes_transferred > 0)
@@ -945,8 +959,6 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
return fail("onReadMessage", ec); return fail("onReadMessage", ec);
if (!socket_.is_open()) if (!socket_.is_open())
return; return;
if (gracefulClose_)
return;
if (bytes_consumed == 0) if (bytes_consumed == 0)
break; break;
read_buffer_.consume(bytes_consumed); read_buffer_.consume(bytes_consumed);
@@ -969,6 +981,7 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
{ {
if (!socket_.is_open()) if (!socket_.is_open())
return; return;
if (ec == boost::asio::error::operation_aborted) if (ec == boost::asio::error::operation_aborted)
return; return;
if (ec) if (ec)
@@ -1002,16 +1015,9 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
std::placeholders::_1, std::placeholders::_1,
std::placeholders::_2))); std::placeholders::_2)));
} }
// If the send queue is empty and we are shutting down, close the connection
if (gracefulClose_) else if (shutdown_)
{ close();
return stream_.async_shutdown(bind_executor(
strand_,
std::bind(
&PeerImp::onShutdown,
shared_from_this(),
std::placeholders::_1)));
}
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@@ -2746,6 +2752,20 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
<< "onMessage: TMSquelch " << slice << " " << id() << " " << duration; << "onMessage: TMSquelch " << slice << " " << id() << " " << duration;
} }
void
PeerImp::onMessage(std::shared_ptr<protocol::TMClose> const& m)
{
if (m->has_reason())
{
JLOG(p_journal_.debug())
<< "onMessage: TMClose: peer closed the connection: "
<< to_string(m->reason());
}
// do not send a close message when the peer initiates the shutdown
close();
}
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
void void

View File

@@ -95,7 +95,7 @@ private:
std::atomic<Tracking> tracking_; std::atomic<Tracking> tracking_;
clock_type::time_point trackingTime_; clock_type::time_point trackingTime_;
bool detaching_ = false; bool shutdown_ = false;
// Node public key of peer. // Node public key of peer.
PublicKey const publicKey_; PublicKey const publicKey_;
std::string name_; std::string name_;
@@ -175,7 +175,6 @@ private:
http_response_type response_; http_response_type response_;
boost::beast::http::fields const& headers_; boost::beast::http::fields const& headers_;
std::queue<std::shared_ptr<Message>> send_queue_; std::queue<std::shared_ptr<Message>> send_queue_;
bool gracefulClose_ = false;
int large_sendq_ = 0; int large_sendq_ = 0;
std::unique_ptr<LoadEvent> load_event_; std::unique_ptr<LoadEvent> load_event_;
// The highest sequence of each PublisherList that has // The highest sequence of each PublisherList that has
@@ -426,7 +425,7 @@ public:
isHighLatency() const override; isHighLatency() const override;
void void
fail(std::string const& reason); fail(protocol::TMCloseReason reason);
bool bool
compressionEnabled() const override compressionEnabled() const override
@@ -445,10 +444,10 @@ private:
close(); close();
void void
fail(std::string const& name, error_code ec); sendAndClose(protocol::TMCloseReason reason);
void void
gracefulClose(); fail(std::string const& name, error_code ec);
void void
setTimer(); setTimer();
@@ -463,10 +462,6 @@ private:
void void
onTimer(boost::system::error_code const& ec); onTimer(boost::system::error_code const& ec);
// Called when SSL shutdown completes
void
onShutdown(error_code ec);
void void
doAccept(); doAccept();
@@ -584,6 +579,8 @@ public:
onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m); onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m);
void void
onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m); onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m);
void
onMessage(std::shared_ptr<protocol::TMClose> const& m);
private: private:
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------

View File

@@ -104,8 +104,8 @@ protocolMessageName(int type)
return "replay_delta_request"; return "replay_delta_request";
case protocol::mtREPLAY_DELTA_RESPONSE: case protocol::mtREPLAY_DELTA_RESPONSE:
return "replay_delta_response"; return "replay_delta_response";
default: case protocol::mtCLOSE:
break; return "close";
} }
return "unknown"; return "unknown";
} }
@@ -470,6 +470,10 @@ invokeProtocolMessage(
success = detail::invoke<protocol::TMReplayDeltaResponse>( success = detail::invoke<protocol::TMReplayDeltaResponse>(
*header, buffers, handler); *header, buffers, handler);
break; break;
case protocol::mtCLOSE:
success =
detail::invoke<protocol::TMClose>(*header, buffers, handler);
break;
default: default:
handler.onMessageUnknown(header->message_type); handler.onMessageUnknown(header->message_type);
success = true; success = true;

View File

@@ -46,7 +46,7 @@ std::unordered_map<protocol::MessageType, TrafficCount::category> const
{protocol::mtTRANSACTIONS, {protocol::mtTRANSACTIONS,
TrafficCount::category::requested_transactions}, TrafficCount::category::requested_transactions},
{protocol::mtSQUELCH, TrafficCount::category::squelch}, {protocol::mtSQUELCH, TrafficCount::category::squelch},
}; {protocol::mtCLOSE, TrafficCount::category::close}};
TrafficCount::category TrafficCount::category
TrafficCount::categorize( TrafficCount::categorize(

View File

@@ -195,7 +195,7 @@ public:
// The total p2p bytes sent and received on the wire // The total p2p bytes sent and received on the wire
total, total,
close,
unknown // must be last unknown // must be last
}; };
@@ -304,6 +304,7 @@ public:
{replay_delta_response, "replay_delta_response"}, {replay_delta_response, "replay_delta_response"},
{have_transactions, "have_transactions"}, {have_transactions, "have_transactions"},
{requested_transactions, "requested_transactions"}, {requested_transactions, "requested_transactions"},
{close, "close"},
{total, "total"}}; {total, "total"}};
if (auto it = category_map.find(cat); it != category_map.end()) if (auto it = category_map.find(cat); it != category_map.end())
@@ -370,6 +371,7 @@ protected:
{have_transactions, {have_transactions}}, {have_transactions, {have_transactions}},
{requested_transactions, {requested_transactions}}, {requested_transactions, {requested_transactions}},
{total, {total}}, {total, {total}},
{close, {close}},
{unknown, {unknown}}, {unknown, {unknown}},
}; };
}; };