Compare commits

..

2 Commits

Author SHA1 Message Date
Vito
a15abd4067 attempt to appease formatting gods 2025-09-02 15:08:57 +02:00
Vito
d9695be838 refactor(overlay): Overhaul peer disconnection logic
This commit refactors the peer shutdown and failure handling
mechanism to be more robust, consistent, and communicative.

The previous implementation used raw strings to represent error reasons
and did not communicate these reasons to peers when shutting down a connection.

With this change disconnections are now explicitly communicated via a
`TMClose` protocol message with strongly-typed reasons. This new
approach provides better diagnostics and makes the peer disconnection
process more stable and predictable.
2025-09-02 15:00:12 +02:00
7 changed files with 127 additions and 250 deletions

View File

@@ -1,163 +0,0 @@
# This workflow builds the binary from the selected commit (not earlier than 2.5.0 release)
name: Build selected commit
# This workflow can only be triggered manually, by a project maintainer
on:
workflow_dispatch:
inputs:
commit:
description: "Commit to build from."
required: false
type: string
build_container:
description: "Build container image to use"
required: true
type: string
default: ghcr.io/xrplf/ci/debian-bullseye:gcc-12
strip_symbols:
description: "Strip debug symbols"
required: true
type: boolean
default: true
archive_archive:
description: "Archive rippled binary"
required: true
type: boolean
default: false
build_only:
description: "Only build, do not run unit tests"
required: true
type: boolean
default: false
build_type:
description: "Build type (Debug or Release)"
required: true
type: choice
default: Release
options:
- Debug
- Release
cmake_args:
description: "CMake args for build"
required: true
type: string
default: "-Dxrpld=ON -Dtests=ON -Dassert=OFF -Dunity=OFF"
dependencies_force_build:
description: "Force building of all dependencies."
required: false
type: boolean
default: false
env:
CONAN_REMOTE_NAME: xrplf
CONAN_REMOTE_URL: https://conan.ripplex.io
BUILD_DIR: .build
jobs:
build:
runs-on: ["self-hosted", "Linux", "X64", "heavy"]
container: ${{ inputs.build_container }}
steps:
- name: Checkout this workflow
uses: actions/checkout@08eba0b27e820071cde6df949e0beb9ba4906955 # v4.3.0
with:
sparse-checkout: |
.github
conan
- name: Move workflow files on a side
run: |
mkdir -p ${{ runner.temp }}
mv .github conan ${{ runner.temp }}
rm -rf .git
- name: Checkout the commit to build
uses: actions/checkout@08eba0b27e820071cde6df949e0beb9ba4906955 # v4.3.0
with:
ref: ${{ inputs.commit }}
- name: Restore workflow files
run: |
rm -rf .github conan
mv ${{ runner.temp }}/.github .
mv ${{ runner.temp }}/conan .
- name: Prepare runner
uses: XRPLF/actions/.github/actions/prepare-runner@638e0dc11ea230f91bd26622fb542116bb5254d5
with:
disable_ccache: true
- name: Check configuration
run: |
echo 'Checking path.'
echo ${PATH} | tr ':' '\n'
echo 'Checking environment variables.'
env | sort
echo 'Checking CMake version.'
cmake --version
echo 'Checking compiler version.'
${CC} --version
echo 'Checking Conan version.'
conan --version
echo 'Checking Ninja version.'
ninja --version
echo 'Checking nproc version.'
nproc --version
- name: Set up Conan configuration
run: |
echo 'Installing configuration.'
cat conan/global.conf >> $(conan config home)/global.conf
echo 'Conan configuration:'
conan config show '*'
- name: Set up Conan profile
run: |
echo 'Installing profile.'
conan config install conan/profiles/default -tf $(conan config home)/profiles/
echo 'Conan profile:'
conan profile show
- name: Set up Conan remote
shell: bash
run: |
echo "Adding Conan remote '${{ env.CONAN_REMOTE_NAME }}' at ${{ env.CONAN_REMOTE_URL }}."
conan remote add --index 0 --force ${{ env.CONAN_REMOTE_NAME }} ${{ env.CONAN_REMOTE_URL }}
echo 'Listing Conan remotes.'
conan remote list
- name: Build dependencies
uses: ./.github/actions/build-deps
with:
build_dir: ${{ env.BUILD_DIR }}
build_type: ${{ inputs.build_type }}
conan_remote_name: ${{ env.CONAN_REMOTE_NAME }}
conan_remote_url: ${{ env.CONAN_REMOTE_URL }}
force_build: ${{ inputs.dependencies_force_build }}
force_upload: false
- name: Build and test binary
uses: ./.github/actions/build-test
with:
build_dir: ${{ env.BUILD_DIR }}
build_only: ${{ inputs.build_only }}
build_type: ${{ inputs.build_type }}
cmake_args: ${{ inputs.cmake_args }}
cmake_target: "all"
os: "linux"
- name: Strip symbols
if: ${{ inputs.strip_symbols == 'true' }}
run: |
strip -D --strip-unneeded ${{ env.BUILD_DIR }}/rippled
${{ env.BUILD_DIR }}/rippled --version
- name: Move the binary
run: |
mv ${{ env.BUILD_DIR }}/rippled .
- name: Archive rippled binary
if: ${{ inputs.archive_archive == 'true' }}
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
with:
name: rippled
path: ./rippled
retention-days: 90
compression-level: 8
overwrite: true

View File

@@ -26,6 +26,7 @@ enum MessageType {
mtREPLAY_DELTA_RESPONSE = 60;
mtHAVE_TRANSACTIONS = 63;
mtTRANSACTIONS = 64;
mtCLOSE = 65;
}
// token, iterations, target, challenge = issue demand for proof of work
@@ -341,3 +342,19 @@ message TMReplayDeltaResponse {
message TMHaveTransactions {
repeated bytes hashes = 1;
}
enum TMCloseReason {
crRESOURCE = 1;
crINVALID_CLOSED_LEDGER = 2;
crINVALID_PREV_LEDGER = 3;
crBAD_LEDGER_HEADERS = 4;
crLARGE_SEND_QUEUE = 5;
crNOT_USEFUL = 6;
crPING_TIMEOUT = 7;
crINTERNAL = 8;
crSHUTDOWN = 9;
}
message TMClose {
required TMCloseReason reason = 1;
}

View File

@@ -64,6 +64,34 @@ std::chrono::seconds constexpr peerTimerInterval{60};
// TODO: Remove this exclusion once unit tests are added after the hotfix
// release.
std::string
to_string(protocol::TMCloseReason reason)
{
switch (reason)
{
case protocol::crRESOURCE:
return "Too Many P2P Messages";
case protocol::crINVALID_CLOSED_LEDGER:
return "Invalid Closed Ledger Header";
case protocol::crINVALID_PREV_LEDGER:
return "Invalid Previous Ledger Header";
case protocol::crBAD_LEDGER_HEADERS:
return "Bad Ledger Headers";
case protocol::crLARGE_SEND_QUEUE:
return "Large Send Queue";
case protocol::crNOT_USEFUL:
return "Peer Not Useful";
case protocol::crPING_TIMEOUT:
return "Ping Timeout";
case protocol::crINTERNAL:
return "Internal Error";
case protocol::crSHUTDOWN:
return "Shutdown";
}
return "unknown";
}
PeerImp::PeerImp(
Application& app,
id_t id,
@@ -178,7 +206,7 @@ PeerImp::run()
closed = parseLedgerHash(iter->value());
if (!closed)
fail("Malformed handshake data (1)");
fail(protocol::TMCloseReason::crINVALID_CLOSED_LEDGER);
}
if (auto const iter = headers_.find("Previous-Ledger");
@@ -187,11 +215,11 @@ PeerImp::run()
previous = parseLedgerHash(iter->value());
if (!previous)
fail("Malformed handshake data (2)");
fail(protocol::TMCloseReason::crINVALID_PREV_LEDGER);
}
if (previous && !closed)
fail("Malformed handshake data (3)");
fail(protocol::TMCloseReason::crBAD_LEDGER_HEADERS);
{
std::lock_guard<std::mutex> sl(recentLock_);
@@ -231,7 +259,8 @@ PeerImp::stop()
JLOG(journal_.info()) << "Stop";
}
}
close();
sendAndClose(protocol::TMCloseReason::crSHUTDOWN);
}
//------------------------------------------------------------------------------
@@ -241,10 +270,6 @@ PeerImp::send(std::shared_ptr<Message> const& m)
{
if (!strand_.running_in_this_thread())
return post(strand_, std::bind(&PeerImp::send, shared_from_this(), m));
if (gracefulClose_)
return;
if (detaching_)
return;
auto validator = m->getValidatorKey();
if (validator && !squelch_.expireSquelch(*validator))
@@ -356,7 +381,7 @@ PeerImp::charge(Resource::Charge const& fee, std::string const& context)
{
// Sever the connection
overlay_.incPeerDisconnectCharges();
fail("charge: Resources");
fail(protocol::TMCloseReason::crRESOURCE);
}
}
@@ -580,7 +605,6 @@ PeerImp::close()
"ripple::PeerImp::close : strand in this thread");
if (socket_.is_open())
{
detaching_ = true; // DEPRECATED
try
{
timer_.cancel();
@@ -604,22 +628,25 @@ PeerImp::close()
}
void
PeerImp::fail(std::string const& reason)
PeerImp::fail(protocol::TMCloseReason reason)
{
if (!strand_.running_in_this_thread())
return post(
strand_,
std::bind(
(void(Peer::*)(std::string const&)) & PeerImp::fail,
(void(Peer::*)(protocol::TMCloseReason)) & PeerImp::fail,
shared_from_this(),
reason));
if (journal_.active(beast::severities::kWarning) && socket_.is_open())
if (journal_.active(beast::severities::kWarning) && socket_.is_open() &&
reason != protocol::TMCloseReason::crINTERNAL)
{
std::string const n = name();
JLOG(journal_.warn()) << (n.empty() ? remote_address_.to_string() : n)
<< " failed: " << reason;
<< " failed: " << to_string(reason);
}
close();
sendAndClose(reason);
}
void
@@ -634,28 +661,28 @@ PeerImp::fail(std::string const& name, error_code ec)
<< name << " from " << toBase58(TokenType::NodePublic, publicKey_)
<< " at " << remote_address_.to_string() << ": " << ec.message();
}
close();
sendAndClose(protocol::TMCloseReason::crINTERNAL);
}
void
PeerImp::gracefulClose()
PeerImp::sendAndClose(protocol::TMCloseReason reason)
{
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::PeerImp::gracefulClose : strand in this thread");
XRPL_ASSERT(
socket_.is_open(), "ripple::PeerImp::gracefulClose : socket is open");
XRPL_ASSERT(
!gracefulClose_,
"ripple::PeerImp::gracefulClose : socket is not closing");
gracefulClose_ = true;
if (send_queue_.size() > 0)
if (shutdown_)
return;
setTimer();
stream_.async_shutdown(bind_executor(
strand_,
std::bind(
&PeerImp::onShutdown, shared_from_this(), std::placeholders::_1)));
// erase all outstanding messages except for the one
// currently being executed
if (send_queue_.size() > 1)
{
decltype(send_queue_) q({send_queue_.front()});
send_queue_.swap(q);
}
shutdown_ = true;
protocol::TMClose tmGC;
tmGC.set_reason(reason);
send(std::make_shared<Message>(tmGC, protocol::mtCLOSE));
}
void
@@ -713,14 +740,11 @@ PeerImp::onTimer(error_code const& ec)
{
// This should never happen
JLOG(journal_.error()) << "onTimer: " << ec.message();
return close();
return fail(protocol::TMCloseReason::crINTERNAL);
}
if (large_sendq_++ >= Tuning::sendqIntervals)
{
fail("Large send queue");
return;
}
return fail(protocol::TMCloseReason::crLARGE_SEND_QUEUE);
if (auto const t = tracking_.load(); !inbound_ && t != Tracking::converged)
{
@@ -737,17 +761,13 @@ PeerImp::onTimer(error_code const& ec)
(duration > app_.config().MAX_UNKNOWN_TIME)))
{
overlay_.peerFinder().on_failure(slot_);
fail("Not useful");
return;
return fail(protocol::TMCloseReason::crNOT_USEFUL);
}
}
// Already waiting for PONG
if (lastPingSeq_)
{
fail("Ping Timeout");
return;
}
return fail(protocol::TMCloseReason::crPING_TIMEOUT);
lastPingTime_ = clock_type::now();
lastPingSeq_ = rand_int<std::uint32_t>();
@@ -761,21 +781,6 @@ PeerImp::onTimer(error_code const& ec)
setTimer();
}
void
PeerImp::onShutdown(error_code ec)
{
cancelTimer();
// If we don't get eof then something went wrong
if (!ec)
{
JLOG(journal_.error()) << "onShutdown: expected error condition";
return close();
}
if (ec != boost::asio::error::eof)
return fail("onShutdown", ec);
close();
}
//------------------------------------------------------------------------------
void
PeerImp::doAccept()
@@ -791,7 +796,10 @@ PeerImp::doAccept()
// This shouldn't fail since we already computed
// the shared value successfully in OverlayImpl
if (!sharedValue)
return fail("makeSharedValue: Unexpected failure");
{
JLOG(journal_.error()) << "doAccept: makeSharedValue failed";
return fail(protocol::TMCloseReason::crINTERNAL);
}
JLOG(journal_.info()) << "Protocol: " << to_string(protocol_);
JLOG(journal_.info()) << "Public Key: "
@@ -841,7 +849,9 @@ PeerImp::doAccept()
return fail("onWriteResponse", ec);
if (write_buffer->size() == bytes_transferred)
return doProtocolStart();
return fail("Failed to write header");
JLOG(journal_.error()) << "Failed to write header";
return fail(protocol::TMCloseReason::crINTERNAL);
}));
}
@@ -905,15 +915,19 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
{
if (!socket_.is_open())
return;
if (ec == boost::asio::error::operation_aborted)
// we started closing the local connection, stop reading
if (ec == boost::asio::error::operation_aborted || shutdown_)
return;
if (ec == boost::asio::error::eof)
{
// Peer initiated connection close, just clean up
JLOG(journal_.info()) << "EOF";
return gracefulClose();
return close();
}
if (ec)
return fail("onReadMessage", ec);
if (auto stream = journal_.trace())
{
if (bytes_transferred > 0)
@@ -945,8 +959,6 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
return fail("onReadMessage", ec);
if (!socket_.is_open())
return;
if (gracefulClose_)
return;
if (bytes_consumed == 0)
break;
read_buffer_.consume(bytes_consumed);
@@ -969,6 +981,7 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
{
if (!socket_.is_open())
return;
if (ec == boost::asio::error::operation_aborted)
return;
if (ec)
@@ -1002,16 +1015,9 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
std::placeholders::_1,
std::placeholders::_2)));
}
if (gracefulClose_)
{
return stream_.async_shutdown(bind_executor(
strand_,
std::bind(
&PeerImp::onShutdown,
shared_from_this(),
std::placeholders::_1)));
}
// If the send queue is empty and we are shutting down, close the connection
else if (shutdown_)
close();
}
//------------------------------------------------------------------------------
@@ -2746,6 +2752,20 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
<< "onMessage: TMSquelch " << slice << " " << id() << " " << duration;
}
void
PeerImp::onMessage(std::shared_ptr<protocol::TMClose> const& m)
{
if (m->has_reason())
{
JLOG(p_journal_.debug())
<< "onMessage: TMClose: peer closed the connection: "
<< to_string(m->reason());
}
// do not send a close message when the peer initiates the shutdown
close();
}
//--------------------------------------------------------------------------
void

View File

@@ -95,7 +95,7 @@ private:
std::atomic<Tracking> tracking_;
clock_type::time_point trackingTime_;
bool detaching_ = false;
bool shutdown_ = false;
// Node public key of peer.
PublicKey const publicKey_;
std::string name_;
@@ -175,7 +175,6 @@ private:
http_response_type response_;
boost::beast::http::fields const& headers_;
std::queue<std::shared_ptr<Message>> send_queue_;
bool gracefulClose_ = false;
int large_sendq_ = 0;
std::unique_ptr<LoadEvent> load_event_;
// The highest sequence of each PublisherList that has
@@ -426,7 +425,7 @@ public:
isHighLatency() const override;
void
fail(std::string const& reason);
fail(protocol::TMCloseReason reason);
bool
compressionEnabled() const override
@@ -445,10 +444,10 @@ private:
close();
void
fail(std::string const& name, error_code ec);
sendAndClose(protocol::TMCloseReason reason);
void
gracefulClose();
fail(std::string const& name, error_code ec);
void
setTimer();
@@ -463,10 +462,6 @@ private:
void
onTimer(boost::system::error_code const& ec);
// Called when SSL shutdown completes
void
onShutdown(error_code ec);
void
doAccept();
@@ -584,6 +579,8 @@ public:
onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m);
void
onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m);
void
onMessage(std::shared_ptr<protocol::TMClose> const& m);
private:
//--------------------------------------------------------------------------

View File

@@ -104,8 +104,8 @@ protocolMessageName(int type)
return "replay_delta_request";
case protocol::mtREPLAY_DELTA_RESPONSE:
return "replay_delta_response";
default:
break;
case protocol::mtCLOSE:
return "close";
}
return "unknown";
}
@@ -470,6 +470,10 @@ invokeProtocolMessage(
success = detail::invoke<protocol::TMReplayDeltaResponse>(
*header, buffers, handler);
break;
case protocol::mtCLOSE:
success =
detail::invoke<protocol::TMClose>(*header, buffers, handler);
break;
default:
handler.onMessageUnknown(header->message_type);
success = true;

View File

@@ -46,7 +46,7 @@ std::unordered_map<protocol::MessageType, TrafficCount::category> const
{protocol::mtTRANSACTIONS,
TrafficCount::category::requested_transactions},
{protocol::mtSQUELCH, TrafficCount::category::squelch},
};
{protocol::mtCLOSE, TrafficCount::category::close}};
TrafficCount::category
TrafficCount::categorize(

View File

@@ -195,7 +195,7 @@ public:
// The total p2p bytes sent and received on the wire
total,
close,
unknown // must be last
};
@@ -304,6 +304,7 @@ public:
{replay_delta_response, "replay_delta_response"},
{have_transactions, "have_transactions"},
{requested_transactions, "requested_transactions"},
{close, "close"},
{total, "total"}};
if (auto it = category_map.find(cat); it != category_map.end())
@@ -370,6 +371,7 @@ protected:
{have_transactions, {have_transactions}},
{requested_transactions, {requested_transactions}},
{total, {total}},
{close, {close}},
{unknown, {unknown}},
};
};