From e3bfbaef80a9f6b5e9330af127082abb83a134dd Mon Sep 17 00:00:00 2001 From: Vito <5780819+Tapanito@users.noreply.github.com> Date: Mon, 8 Sep 2025 13:20:04 +0200 Subject: [PATCH] edits --- src/xrpld/overlay/detail/ConnectAttempt.cpp | 3 +- src/xrpld/overlay/detail/InboundHandshake.cpp | 4 +- src/xrpld/overlay/detail/PeerImp.cpp | 16 +- src/xrpld/overlay/detail/PeerImp.h | 11 +- .../handlers/ProtocolMessageHandler.cpp | 144 ++++++++++++++++++ .../detail/handlers/ProtocolMessageHandler.h | 142 +++++++++++++++++ 6 files changed, 305 insertions(+), 15 deletions(-) create mode 100644 src/xrpld/overlay/detail/handlers/ProtocolMessageHandler.cpp create mode 100644 src/xrpld/overlay/detail/handlers/ProtocolMessageHandler.h diff --git a/src/xrpld/overlay/detail/ConnectAttempt.cpp b/src/xrpld/overlay/detail/ConnectAttempt.cpp index 6138a530bf..3d97f58a33 100644 --- a/src/xrpld/overlay/detail/ConnectAttempt.cpp +++ b/src/xrpld/overlay/detail/ConnectAttempt.cpp @@ -425,7 +425,8 @@ ConnectAttempt::processResponse() *negotiatedProtocol, id_, attributes, - overlay_); + overlay_, + app_.cluster().member(publicKey).value_or("")); overlay_.add_active(peer); } diff --git a/src/xrpld/overlay/detail/InboundHandshake.cpp b/src/xrpld/overlay/detail/InboundHandshake.cpp index 30721b2230..3e2d7bb9cb 100644 --- a/src/xrpld/overlay/detail/InboundHandshake.cpp +++ b/src/xrpld/overlay/detail/InboundHandshake.cpp @@ -17,6 +17,7 @@ */ //============================================================================== +#include #include #include #include @@ -207,7 +208,8 @@ InboundHandshake::createPeer() protocolVersion_, attributes_, publicKey_, - id_); + id_, + app_.cluster().member(publicKey_).value_or("")); // Add the peer to the overlay overlay_.add_active(peer); diff --git a/src/xrpld/overlay/detail/PeerImp.cpp b/src/xrpld/overlay/detail/PeerImp.cpp index 48641fa34b..d15638daf1 100644 --- a/src/xrpld/overlay/detail/PeerImp.cpp +++ b/src/xrpld/overlay/detail/PeerImp.cpp @@ -50,6 +50,7 @@ #include #include #include +#include using namespace std::chrono_literals; @@ -157,7 +158,8 @@ PeerImp::PeerImp( ProtocolVersion protocol, PeerAttributes const& attributes, PublicKey const& publicKey, - id_t id) + id_t id, + std::string const& name) : Child(overlay) , app_(app) , id_(id) @@ -175,6 +177,7 @@ PeerImp::PeerImp( , tracking_(Tracking::unknown) , trackingTime_(clock_type::now()) , publicKey_(publicKey) + , name_(name) , lastPingTime_(clock_type::now()) , creationTime_(clock_type::now()) , squelch_(app_.journal("Squelch")) @@ -419,7 +422,7 @@ PeerImp::crawl() const bool PeerImp::cluster() const { - return static_cast(app_.cluster().member(publicKey_)); + return app_.cluster().member(publicKey_).has_value(); } std::string @@ -835,11 +838,7 @@ PeerImp::doAccept() if (auto member = app_.cluster().member(publicKey_)) { - { - std::unique_lock lock{nameMutex_}; - name_ = *member; - } - JLOG(journal_.info()) << "Cluster name: " << *member; + JLOG(journal_.info()) << "Cluster name: " << name_; } doProtocolStart(); @@ -848,9 +847,8 @@ PeerImp::doAccept() std::string PeerImp::name() const { - std::shared_lock read_lock{nameMutex_}; return name_; -} +} std::string PeerImp::domain() const diff --git a/src/xrpld/overlay/detail/PeerImp.h b/src/xrpld/overlay/detail/PeerImp.h index ecca36c244..5922c3d2c1 100644 --- a/src/xrpld/overlay/detail/PeerImp.h +++ b/src/xrpld/overlay/detail/PeerImp.h @@ -210,7 +210,6 @@ private: // Node public key of peer. PublicKey const publicKey_; std::string name_; - std::shared_mutex mutable nameMutex_; // The indices of the smallest and largest ledgers this peer has available // @@ -268,7 +267,8 @@ public: ProtocolVersion protocol, PeerAttributes const& attributes, PublicKey const& publicKey, - id_t id); + id_t id, + std::string const& name); /** Create outgoing, handshaked peer. */ template @@ -282,7 +282,8 @@ public: ProtocolVersion protocol, id_t id, PeerAttributes const& attributes, - OverlayImpl& overlay); + OverlayImpl& overlay, + std::string const& name); virtual ~PeerImp(); @@ -684,7 +685,8 @@ PeerImp::PeerImp( ProtocolVersion protocol, id_t id, PeerAttributes const& attributes, - OverlayImpl& overlay) + OverlayImpl& overlay, + std::string const& name) : Child(overlay) , app_(app) , id_(id) @@ -702,6 +704,7 @@ PeerImp::PeerImp( , tracking_(Tracking::unknown) , trackingTime_(clock_type::now()) , publicKey_(publicKey) + , name_(name) , lastPingTime_(clock_type::now()) , creationTime_(clock_type::now()) , squelch_(app_.journal("Squelch")) diff --git a/src/xrpld/overlay/detail/handlers/ProtocolMessageHandler.cpp b/src/xrpld/overlay/detail/handlers/ProtocolMessageHandler.cpp new file mode 100644 index 0000000000..2363bace0a --- /dev/null +++ b/src/xrpld/overlay/detail/handlers/ProtocolMessageHandler.cpp @@ -0,0 +1,144 @@ + +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace ripple { + +// Helper function to check for valid uint256 values in protobuf buffers +static bool +stringIsUint256Sized(std::string const& pBuffStr) +{ + return pBuffStr.size() == uint256::size(); +} + +void +ProtocolMessageHandler::onMessage( + std::shared_ptr const& m) +{ + protocol::TMProposeSet& set = *m; + + auto const sig = makeSlice(set.signature()); + + // Preliminary check for the validity of the signature: A DER encoded + // signature can't be longer than 72 bytes. + if ((std::clamp(sig.size(), 64, 72) != sig.size()) || + (publicKeyType(makeSlice(set.nodepubkey())) != KeyType::secp256k1)) + { + JLOG(p_journal_.warn()) << "Proposal: malformed"; + fee_.update( + Resource::feeInvalidSignature, + " signature can't be longer than 72 bytes"); + return; + } + + if (!stringIsUint256Sized(set.currenttxhash()) || + !stringIsUint256Sized(set.previousledger())) + { + JLOG(p_journal_.warn()) << "Proposal: malformed"; + fee_.update(Resource::feeMalformedRequest, "bad hashes"); + return; + } + + // RH TODO: when isTrusted = false we should probably also cache a key + // suppression for 30 seconds to avoid doing a relatively expensive + // lookup every time a spam packet is received + PublicKey const publicKey{makeSlice(set.nodepubkey())}; + auto const isTrusted = app_.validators().trusted(publicKey); + + // If the operator has specified that untrusted proposals be dropped + // then this happens here I.e. before further wasting CPU verifying the + // signature of an untrusted key + if (!isTrusted) + { + // report untrusted proposal messages + overlay_.reportInboundTraffic( + TrafficCount::category::proposal_untrusted, + Message::messageSize(*m)); + + if (app_.config().RELAY_UNTRUSTED_PROPOSALS == -1) + return; + } + + uint256 const proposeHash{set.currenttxhash()}; + uint256 const prevLedger{set.previousledger()}; + + NetClock::time_point const closeTime{NetClock::duration{set.closetime()}}; + + uint256 const suppression = proposalUniqueId( + proposeHash, + prevLedger, + set.proposeseq(), + closeTime, + publicKey.slice(), + sig); + + if (auto [added, relayed] = + app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_); + !added) + { + // Count unique messages (Slots has it's own 'HashRouter'), which a + // peer receives within IDLED seconds since the message has been + // relayed. + if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED) + overlay_.updateSlotAndSquelch( + suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER); + + // report duplicate proposal messages + overlay_.reportInboundTraffic( + TrafficCount::category::proposal_duplicate, + Message::messageSize(*m)); + + JLOG(p_journal_.trace()) << "Proposal: duplicate"; + + return; + } + + if (!isTrusted) + { + if (tracking_.load() == Tracking::diverged) + { + JLOG(p_journal_.debug()) + << "Proposal: Dropping untrusted (peer divergence)"; + return; + } + + if (!cluster() && app_.getFeeTrack().isLoadedLocal()) + { + JLOG(p_journal_.debug()) << "Proposal: Dropping untrusted (load)"; + return; + } + } + + JLOG(p_journal_.trace()) + << "Proposal: " << (isTrusted ? "trusted" : "untrusted"); + + auto proposal = RCLCxPeerPos( + publicKey, + sig, + suppression, + RCLCxPeerPos::Proposal{ + prevLedger, + set.proposeseq(), + proposeHash, + closeTime, + app_.timeKeeper().closeTime(), + calcNodeID(app_.validatorManifests().getMasterKey(publicKey))}); + + std::weak_ptr weak = shared_from_this(); + app_.getJobQueue().addJob( + isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut, + "recvPropose->checkPropose", + [weak, isTrusted, m, proposal]() { + if (auto peer = weak.lock()) + peer->checkPropose(isTrusted, m, proposal); + }); +} + +} // namespace ripple \ No newline at end of file diff --git a/src/xrpld/overlay/detail/handlers/ProtocolMessageHandler.h b/src/xrpld/overlay/detail/handlers/ProtocolMessageHandler.h new file mode 100644 index 0000000000..4685349cdb --- /dev/null +++ b/src/xrpld/overlay/detail/handlers/ProtocolMessageHandler.h @@ -0,0 +1,142 @@ + +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace ripple { + +class ProtocolMessageHandler +{ +private: + beast::Journal const journal_; + beast::Journal const p_journal_; + OverlayImpl& overlay_; + Application& app_; + +public: + void + onMessageUnknown(std::uint16_t type); + + void + onMessageBegin( + std::uint16_t type, + std::shared_ptr<::google::protobuf::Message> const& m, + std::size_t size, + std::size_t uncompressed_size, + bool isCompressed); + + void + onMessageEnd( + std::uint16_t type, + std::shared_ptr<::google::protobuf::Message> const& m); + + void + onMessage(std::shared_ptr const& m); + void + onMessage(std::shared_ptr const& m); + void + onMessage(std::shared_ptr const& m); + void + onMessage(std::shared_ptr const& m); + void + onMessage(std::shared_ptr const& m); + void + onMessage(std::shared_ptr const& m); + void + onMessage(std::shared_ptr const& m); + void + onMessage(std::shared_ptr const& m); + void + onMessage(std::shared_ptr const& m); + void + onMessage(std::shared_ptr const& m); + void + onMessage(std::shared_ptr const& m); + void + onMessage(std::shared_ptr const& m); + void + onMessage(std::shared_ptr const& m); + void + onMessage(std::shared_ptr const& m); + void + onMessage(std::shared_ptr const& m); + void + onMessage(std::shared_ptr const& m); + void + onMessage(std::shared_ptr const& m); + void + onMessage(std::shared_ptr const& m); + void + onMessage(std::shared_ptr const& m); + void + onMessage(std::shared_ptr const& m); + void + onMessage(std::shared_ptr const& m); + +private: + //-------------------------------------------------------------------------- + // lockedRecentLock is passed as a reminder to callers that recentLock_ + // must be locked. + void + addLedger( + uint256 const& hash, + std::lock_guard const& lockedRecentLock); + + void + doFetchPack(std::shared_ptr const& packet); + + void + onValidatorListMessage( + std::string const& messageType, + std::string const& manifest, + std::uint32_t version, + std::vector const& blobs); + + /** Process peer's request to send missing transactions. The request is + sent in response to TMHaveTransactions. + @param packet protocol message containing missing transactions' hashes. + */ + void + doTransactions(std::shared_ptr const& packet); + + void + checkTransaction( + HashRouterFlags flags, + bool checkSignature, + std::shared_ptr const& stx, + bool batch); + + void + checkPropose( + bool isTrusted, + std::shared_ptr const& packet, + RCLCxPeerPos peerPos); + + void + checkValidation( + std::shared_ptr const& val, + uint256 const& key, + std::shared_ptr const& packet); + + void + sendLedgerBase( + std::shared_ptr const& ledger, + protocol::TMLedgerData& ledgerData); + + std::shared_ptr + getLedger(std::shared_ptr const& m); + + std::shared_ptr + getTxSet(std::shared_ptr const& m) const; + + void + processLedgerRequest(std::shared_ptr const& m); +}; + +} // namespace ripple \ No newline at end of file