This commit is contained in:
Vito
2025-09-08 13:20:04 +02:00
parent 2dc49e30c4
commit e3bfbaef80
6 changed files with 305 additions and 15 deletions

View File

@@ -425,7 +425,8 @@ ConnectAttempt::processResponse()
*negotiatedProtocol,
id_,
attributes,
overlay_);
overlay_,
app_.cluster().member(publicKey).value_or(""));
overlay_.add_active(peer);
}

View File

@@ -17,6 +17,7 @@
*/
//==============================================================================
#include <xrpld/overlay/Cluster.h>
#include <xrpld/overlay/detail/Handshake.h>
#include <xrpld/overlay/detail/InboundHandshake.h>
#include <xrpld/overlay/detail/OverlayImpl.h>
@@ -207,7 +208,8 @@ InboundHandshake::createPeer()
protocolVersion_,
attributes_,
publicKey_,
id_);
id_,
app_.cluster().member(publicKey_).value_or(""));
// Add the peer to the overlay
overlay_.add_active(peer);

View File

@@ -50,6 +50,7 @@
#include <mutex>
#include <numeric>
#include <sstream>
#include <string>
using namespace std::chrono_literals;
@@ -157,7 +158,8 @@ PeerImp::PeerImp(
ProtocolVersion protocol,
PeerAttributes const& attributes,
PublicKey const& publicKey,
id_t id)
id_t id,
std::string const& name)
: Child(overlay)
, app_(app)
, id_(id)
@@ -175,6 +177,7 @@ PeerImp::PeerImp(
, tracking_(Tracking::unknown)
, trackingTime_(clock_type::now())
, publicKey_(publicKey)
, name_(name)
, lastPingTime_(clock_type::now())
, creationTime_(clock_type::now())
, squelch_(app_.journal("Squelch"))
@@ -419,7 +422,7 @@ PeerImp::crawl() const
bool
PeerImp::cluster() const
{
return static_cast<bool>(app_.cluster().member(publicKey_));
return app_.cluster().member(publicKey_).has_value();
}
std::string
@@ -835,11 +838,7 @@ PeerImp::doAccept()
if (auto member = app_.cluster().member(publicKey_))
{
{
std::unique_lock lock{nameMutex_};
name_ = *member;
}
JLOG(journal_.info()) << "Cluster name: " << *member;
JLOG(journal_.info()) << "Cluster name: " << name_;
}
doProtocolStart();
@@ -848,9 +847,8 @@ PeerImp::doAccept()
std::string
PeerImp::name() const
{
std::shared_lock read_lock{nameMutex_};
return name_;
}
}
std::string
PeerImp::domain() const

View File

@@ -210,7 +210,6 @@ private:
// Node public key of peer.
PublicKey const publicKey_;
std::string name_;
std::shared_mutex mutable nameMutex_;
// The indices of the smallest and largest ledgers this peer has available
//
@@ -268,7 +267,8 @@ public:
ProtocolVersion protocol,
PeerAttributes const& attributes,
PublicKey const& publicKey,
id_t id);
id_t id,
std::string const& name);
/** Create outgoing, handshaked peer. */
template <class Buffers>
@@ -282,7 +282,8 @@ public:
ProtocolVersion protocol,
id_t id,
PeerAttributes const& attributes,
OverlayImpl& overlay);
OverlayImpl& overlay,
std::string const& name);
virtual ~PeerImp();
@@ -684,7 +685,8 @@ PeerImp::PeerImp(
ProtocolVersion protocol,
id_t id,
PeerAttributes const& attributes,
OverlayImpl& overlay)
OverlayImpl& overlay,
std::string const& name)
: Child(overlay)
, app_(app)
, id_(id)
@@ -702,6 +704,7 @@ PeerImp::PeerImp(
, tracking_(Tracking::unknown)
, trackingTime_(clock_type::now())
, publicKey_(publicKey)
, name_(name)
, lastPingTime_(clock_type::now())
, creationTime_(clock_type::now())
, squelch_(app_.journal("Squelch"))

View File

@@ -0,0 +1,144 @@
#include <xrpld/app/consensus/RCLCxPeerPos.h>
#include <xrpld/app/ledger/Ledger.h>
#include <xrpld/app/misc/HashRouter.h>
#include <xrpld/overlay/detail/handlers/ProtocolMessageHandler.h>
#include <xrpld/shamap/SHAMap.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/STValidation.h>
#include <xrpl/protocol/messages.h>
namespace ripple {
// Helper function to check for valid uint256 values in protobuf buffers
static bool
stringIsUint256Sized(std::string const& pBuffStr)
{
return pBuffStr.size() == uint256::size();
}
void
ProtocolMessageHandler::onMessage(
std::shared_ptr<protocol::TMProposeSet> const& m)
{
protocol::TMProposeSet& set = *m;
auto const sig = makeSlice(set.signature());
// Preliminary check for the validity of the signature: A DER encoded
// signature can't be longer than 72 bytes.
if ((std::clamp<std::size_t>(sig.size(), 64, 72) != sig.size()) ||
(publicKeyType(makeSlice(set.nodepubkey())) != KeyType::secp256k1))
{
JLOG(p_journal_.warn()) << "Proposal: malformed";
fee_.update(
Resource::feeInvalidSignature,
" signature can't be longer than 72 bytes");
return;
}
if (!stringIsUint256Sized(set.currenttxhash()) ||
!stringIsUint256Sized(set.previousledger()))
{
JLOG(p_journal_.warn()) << "Proposal: malformed";
fee_.update(Resource::feeMalformedRequest, "bad hashes");
return;
}
// RH TODO: when isTrusted = false we should probably also cache a key
// suppression for 30 seconds to avoid doing a relatively expensive
// lookup every time a spam packet is received
PublicKey const publicKey{makeSlice(set.nodepubkey())};
auto const isTrusted = app_.validators().trusted(publicKey);
// If the operator has specified that untrusted proposals be dropped
// then this happens here I.e. before further wasting CPU verifying the
// signature of an untrusted key
if (!isTrusted)
{
// report untrusted proposal messages
overlay_.reportInboundTraffic(
TrafficCount::category::proposal_untrusted,
Message::messageSize(*m));
if (app_.config().RELAY_UNTRUSTED_PROPOSALS == -1)
return;
}
uint256 const proposeHash{set.currenttxhash()};
uint256 const prevLedger{set.previousledger()};
NetClock::time_point const closeTime{NetClock::duration{set.closetime()}};
uint256 const suppression = proposalUniqueId(
proposeHash,
prevLedger,
set.proposeseq(),
closeTime,
publicKey.slice(),
sig);
if (auto [added, relayed] =
app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_);
!added)
{
// Count unique messages (Slots has it's own 'HashRouter'), which a
// peer receives within IDLED seconds since the message has been
// relayed.
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
overlay_.updateSlotAndSquelch(
suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
// report duplicate proposal messages
overlay_.reportInboundTraffic(
TrafficCount::category::proposal_duplicate,
Message::messageSize(*m));
JLOG(p_journal_.trace()) << "Proposal: duplicate";
return;
}
if (!isTrusted)
{
if (tracking_.load() == Tracking::diverged)
{
JLOG(p_journal_.debug())
<< "Proposal: Dropping untrusted (peer divergence)";
return;
}
if (!cluster() && app_.getFeeTrack().isLoadedLocal())
{
JLOG(p_journal_.debug()) << "Proposal: Dropping untrusted (load)";
return;
}
}
JLOG(p_journal_.trace())
<< "Proposal: " << (isTrusted ? "trusted" : "untrusted");
auto proposal = RCLCxPeerPos(
publicKey,
sig,
suppression,
RCLCxPeerPos::Proposal{
prevLedger,
set.proposeseq(),
proposeHash,
closeTime,
app_.timeKeeper().closeTime(),
calcNodeID(app_.validatorManifests().getMasterKey(publicKey))});
std::weak_ptr<PeerImp> weak = shared_from_this();
app_.getJobQueue().addJob(
isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut,
"recvPropose->checkPropose",
[weak, isTrusted, m, proposal]() {
if (auto peer = weak.lock())
peer->checkPropose(isTrusted, m, proposal);
});
}
} // namespace ripple

View File

@@ -0,0 +1,142 @@
#include <xrpld/app/consensus/RCLCxPeerPos.h>
#include <xrpld/app/ledger/Ledger.h>
#include <xrpld/app/misc/HashRouter.h>
#include <xrpld/overlay/detail/OverlayImpl.h>
#include <xrpld/shamap/SHAMap.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/STValidation.h>
#include <xrpl/protocol/messages.h>
namespace ripple {
class ProtocolMessageHandler
{
private:
beast::Journal const journal_;
beast::Journal const p_journal_;
OverlayImpl& overlay_;
Application& app_;
public:
void
onMessageUnknown(std::uint16_t type);
void
onMessageBegin(
std::uint16_t type,
std::shared_ptr<::google::protobuf::Message> const& m,
std::size_t size,
std::size_t uncompressed_size,
bool isCompressed);
void
onMessageEnd(
std::uint16_t type,
std::shared_ptr<::google::protobuf::Message> const& m);
void
onMessage(std::shared_ptr<protocol::TMManifests> const& m);
void
onMessage(std::shared_ptr<protocol::TMPing> const& m);
void
onMessage(std::shared_ptr<protocol::TMCluster> const& m);
void
onMessage(std::shared_ptr<protocol::TMEndpoints> const& m);
void
onMessage(std::shared_ptr<protocol::TMTransaction> const& m);
void
onMessage(std::shared_ptr<protocol::TMGetLedger> const& m);
void
onMessage(std::shared_ptr<protocol::TMLedgerData> const& m);
void
onMessage(std::shared_ptr<protocol::TMProposeSet> const& m);
void
onMessage(std::shared_ptr<protocol::TMStatusChange> const& m);
void
onMessage(std::shared_ptr<protocol::TMHaveTransactionSet> const& m);
void
onMessage(std::shared_ptr<protocol::TMValidatorList> const& m);
void
onMessage(std::shared_ptr<protocol::TMValidatorListCollection> const& m);
void
onMessage(std::shared_ptr<protocol::TMValidation> const& m);
void
onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m);
void
onMessage(std::shared_ptr<protocol::TMHaveTransactions> const& m);
void
onMessage(std::shared_ptr<protocol::TMTransactions> const& m);
void
onMessage(std::shared_ptr<protocol::TMSquelch> const& m);
void
onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m);
void
onMessage(std::shared_ptr<protocol::TMProofPathResponse> const& m);
void
onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m);
void
onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m);
private:
//--------------------------------------------------------------------------
// lockedRecentLock is passed as a reminder to callers that recentLock_
// must be locked.
void
addLedger(
uint256 const& hash,
std::lock_guard<std::mutex> const& lockedRecentLock);
void
doFetchPack(std::shared_ptr<protocol::TMGetObjectByHash> const& packet);
void
onValidatorListMessage(
std::string const& messageType,
std::string const& manifest,
std::uint32_t version,
std::vector<protocol::ValidatorBlobInfo> const& blobs);
/** Process peer's request to send missing transactions. The request is
sent in response to TMHaveTransactions.
@param packet protocol message containing missing transactions' hashes.
*/
void
doTransactions(std::shared_ptr<protocol::TMGetObjectByHash> const& packet);
void
checkTransaction(
HashRouterFlags flags,
bool checkSignature,
std::shared_ptr<STTx const> const& stx,
bool batch);
void
checkPropose(
bool isTrusted,
std::shared_ptr<protocol::TMProposeSet> const& packet,
RCLCxPeerPos peerPos);
void
checkValidation(
std::shared_ptr<STValidation> const& val,
uint256 const& key,
std::shared_ptr<protocol::TMValidation> const& packet);
void
sendLedgerBase(
std::shared_ptr<Ledger const> const& ledger,
protocol::TMLedgerData& ledgerData);
std::shared_ptr<Ledger const>
getLedger(std::shared_ptr<protocol::TMGetLedger> const& m);
std::shared_ptr<SHAMap const>
getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m) const;
void
processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m);
};
} // namespace ripple