adds isTrusted parameter to updateSlotAndSquelch method to differentiate messages from trusted validators

This commit is contained in:
Vito
2025-05-28 16:09:23 +02:00
parent 34c3591554
commit 69aec23e1b
4 changed files with 81 additions and 71 deletions

View File

@@ -121,13 +121,15 @@ private:
Slot( Slot(
SquelchHandler const& handler, SquelchHandler const& handler,
beast::Journal journal, beast::Journal journal,
uint16_t maxSelectedPeers) uint16_t maxSelectedPeers,
bool isTrusted)
: reachedThreshold_(0) : reachedThreshold_(0)
, lastSelected_(clock_type::now()) , lastSelected_(clock_type::now())
, state_(SlotState::Counting) , state_(SlotState::Counting)
, handler_(handler) , handler_(handler)
, journal_(journal) , journal_(journal)
, maxSelectedPeers_(maxSelectedPeers) , maxSelectedPeers_(maxSelectedPeers)
, isTrusted_(isTrusted)
{ {
} }
@@ -139,22 +141,19 @@ private:
* MIN_MESSAGE_THRESHOLD then add peer to considered peers pool. If the * MIN_MESSAGE_THRESHOLD then add peer to considered peers pool. If the
* number of considered peers who reached MAX_MESSAGE_THRESHOLD is * number of considered peers who reached MAX_MESSAGE_THRESHOLD is
* maxSelectedPeers_ then randomly select maxSelectedPeers_ from * maxSelectedPeers_ then randomly select maxSelectedPeers_ from
* considered peers, and call squelch handler for each peer, which is not * considered peers, and call squelch handler for each peer, which is
* selected and not already in Squelched state. Set the state for those * not selected and not already in Squelched state. Set the state for
* peers to Squelched and reset the count of all peers. Set slot's state to * those peers to Squelched and reset the count of all peers. Set slot's
* Selected. Message count is not updated when the slot is in Selected * state to Selected. Message count is not updated when the slot is in
* state. * Selected state.
* @param validator Public key of the source validator * @param validator Public key of the source validator
* @param id Peer id which received the message * @param id Peer id which received the message
* @param type Message type (Validation and Propose Set only,
* others are ignored, future use)
* @param callback A callback to report ignored squelches * @param callback A callback to report ignored squelches
*/ */
void void
update( update(
PublicKey const& validator, PublicKey const& validator,
id_t id, id_t id,
protocol::MessageType type,
ignored_squelch_callback callback); ignored_squelch_callback callback);
/** Handle peer deletion when a peer disconnects. /** Handle peer deletion when a peer disconnects.
@@ -257,6 +256,9 @@ private:
// the maximum number of peers that should be selected as a validator // the maximum number of peers that should be selected as a validator
// message source // message source
uint16_t const maxSelectedPeers_; uint16_t const maxSelectedPeers_;
// indicate if the slot is for a trusted validator
bool const isTrusted_;
}; };
template <typename clock_type> template <typename clock_type>
@@ -287,7 +289,6 @@ void
Slot<clock_type>::update( Slot<clock_type>::update(
PublicKey const& validator, PublicKey const& validator,
id_t id, id_t id,
protocol::MessageType type,
ignored_squelch_callback callback) ignored_squelch_callback callback)
{ {
using namespace std::chrono; using namespace std::chrono;
@@ -321,8 +322,7 @@ Slot<clock_type>::update(
<< " slot state " << static_cast<int>(state_) << " peer state " << " slot state " << static_cast<int>(state_) << " peer state "
<< static_cast<int>(peer.state) << " count " << peer.count << " last " << static_cast<int>(peer.state) << " count " << peer.count << " last "
<< duration_cast<milliseconds>(now - peer.lastMessage).count() << duration_cast<milliseconds>(now - peer.lastMessage).count()
<< " pool " << considered_.size() << " threshold " << reachedThreshold_ << " pool " << considered_.size() << " threshold " << reachedThreshold_;
<< " " << (type == protocol::mtVALIDATION ? "validation" : "proposal");
peer.lastMessage = now; peer.lastMessage = now;
@@ -605,37 +605,39 @@ public:
return reduceRelayReady_; return reduceRelayReady_;
} }
/** Calls Slot::update of Slot associated with the validator, with a noop /** Calls Slot::update of Slot associated with the validator, with a
* callback. * noop callback.
* @param key Message's hash * @param key Message's hash
* @param validator Validator's public key * @param validator Validator's public key
* @param id Peer's id which received the message * @param id Peer's id which received the message
* @param type Received protocol message type * @param isTrusted Boolean to indicate if the message is from a trusted
* validator
*/ */
void void
updateSlotAndSquelch( updateSlotAndSquelch(
uint256 const& key, uint256 const& key,
PublicKey const& validator, PublicKey const& validator,
id_t id, id_t id,
protocol::MessageType type) bool isTrusted)
{ {
updateSlotAndSquelch(key, validator, id, type, []() {}); updateSlotAndSquelch(key, validator, id, []() {}, isTrusted);
} }
/** Calls Slot::update of Slot associated with the validator. /** Calls Slot::update of Slot associated with the validator.
* @param key Message's hash * @param key Message's hash
* @param validator Validator's public key * @param validator Validator's public key
* @param id Peer's id which received the message * @param id Peer's id which received the message
* @param type Received protocol message type
* @param callback A callback to report ignored validations * @param callback A callback to report ignored validations
* @param isTrusted Boolean to indicate if the message is from a trusted
* validator
*/ */
void void
updateSlotAndSquelch( updateSlotAndSquelch(
uint256 const& key, uint256 const& key,
PublicKey const& validator, PublicKey const& validator,
id_t id, id_t id,
protocol::MessageType type, typename Slot<clock_type>::ignored_squelch_callback callback,
typename Slot<clock_type>::ignored_squelch_callback callback); bool isTrusted);
/** Check if peers stopped relaying messages /** Check if peers stopped relaying messages
* and if slots stopped receiving messages from the validator. * and if slots stopped receiving messages from the validator.
@@ -780,8 +782,8 @@ Slots<clock_type>::updateSlotAndSquelch(
uint256 const& key, uint256 const& key,
PublicKey const& validator, PublicKey const& validator,
id_t id, id_t id,
protocol::MessageType type, typename Slot<clock_type>::ignored_squelch_callback callback,
typename Slot<clock_type>::ignored_squelch_callback callback) bool isTrusted)
{ {
if (!addPeerMessage(key, id)) if (!addPeerMessage(key, id))
return; return;

View File

@@ -1415,7 +1415,7 @@ OverlayImpl::updateSlotAndSquelch(
uint256 const& key, uint256 const& key,
PublicKey const& validator, PublicKey const& validator,
std::set<Peer::id_t>&& peers, std::set<Peer::id_t>&& peers,
protocol::MessageType type) bool isTrusted)
{ {
if (!slots_.baseSquelchReady()) if (!slots_.baseSquelchReady())
return; return;
@@ -1423,14 +1423,22 @@ OverlayImpl::updateSlotAndSquelch(
if (!strand_.running_in_this_thread()) if (!strand_.running_in_this_thread())
return post( return post(
strand_, strand_,
[this, key, validator, peers = std::move(peers), type]() mutable { [this,
updateSlotAndSquelch(key, validator, std::move(peers), type); key,
validator,
peers = std::move(peers),
isTrusted]() mutable {
updateSlotAndSquelch(
key, validator, std::move(peers), isTrusted);
}); });
for (auto id : peers) for (auto id : peers)
slots_.updateSlotAndSquelch(key, validator, id, type, [&]() { slots_.updateSlotAndSquelch(
reportInboundTraffic(TrafficCount::squelch_ignored, 0); key,
}); validator,
id,
[&]() { reportInboundTraffic(TrafficCount::squelch_ignored, 0); },
isTrusted);
} }
void void
@@ -1438,19 +1446,22 @@ OverlayImpl::updateSlotAndSquelch(
uint256 const& key, uint256 const& key,
PublicKey const& validator, PublicKey const& validator,
Peer::id_t peer, Peer::id_t peer,
protocol::MessageType type) bool isTrusted)
{ {
if (!slots_.baseSquelchReady()) if (!slots_.baseSquelchReady())
return; return;
if (!strand_.running_in_this_thread()) if (!strand_.running_in_this_thread())
return post(strand_, [this, key, validator, peer, type]() { return post(strand_, [this, key, validator, peer, isTrusted]() {
updateSlotAndSquelch(key, validator, peer, type); updateSlotAndSquelch(key, validator, peer, isTrusted);
}); });
slots_.updateSlotAndSquelch(key, validator, peer, type, [&]() { slots_.updateSlotAndSquelch(
reportInboundTraffic(TrafficCount::squelch_ignored, 0); key,
}); validator,
peer,
[&]() { reportInboundTraffic(TrafficCount::squelch_ignored, 0); },
isTrusted);
} }
void void

View File

@@ -399,14 +399,14 @@ public:
* @param key Unique message's key * @param key Unique message's key
* @param validator Validator's public key * @param validator Validator's public key
* @param peers Peers' id to update the slots for * @param peers Peers' id to update the slots for
* @param type Received protocol message type * @param isTrusted Indicate if the validator is trusted
*/ */
void void
updateSlotAndSquelch( updateSlotAndSquelch(
uint256 const& key, uint256 const& key,
PublicKey const& validator, PublicKey const& validator,
std::set<Peer::id_t>&& peers, std::set<Peer::id_t>&& peers,
protocol::MessageType type); bool isTrusted);
/** Overload to reduce allocation in case of single peer /** Overload to reduce allocation in case of single peer
*/ */
@@ -415,7 +415,7 @@ public:
uint256 const& key, uint256 const& key,
PublicKey const& validator, PublicKey const& validator,
Peer::id_t peer, Peer::id_t peer,
protocol::MessageType type); bool isTrusted);
/** Called when the peer is deleted. If the peer was selected to be the /** Called when the peer is deleted. If the peer was selected to be the
* source of messages from the validator then squelched peers have to be * source of messages from the validator then squelched peers have to be

View File

@@ -1699,21 +1699,6 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
// suppression for 30 seconds to avoid doing a relatively expensive lookup // suppression for 30 seconds to avoid doing a relatively expensive lookup
// every time a spam packet is received // every time a spam packet is received
PublicKey const publicKey{makeSlice(set.nodepubkey())}; PublicKey const publicKey{makeSlice(set.nodepubkey())};
auto const isTrusted = app_.validators().trusted(publicKey);
// If the operator has specified that untrusted proposals be dropped then
// this happens here I.e. before further wasting CPU verifying the signature
// of an untrusted key
if (!isTrusted)
{
// report untrusted proposal messages
overlay_.reportInboundTraffic(
TrafficCount::category::proposal_untrusted,
Message::messageSize(*m));
if (app_.config().RELAY_UNTRUSTED_PROPOSALS == -1)
return;
}
uint256 const proposeHash{set.currenttxhash()}; uint256 const proposeHash{set.currenttxhash()};
uint256 const prevLedger{set.previousledger()}; uint256 const prevLedger{set.previousledger()};
@@ -1728,7 +1713,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
publicKey.slice(), publicKey.slice(),
sig); sig);
if (auto [added, relayed] = auto const isTrusted = app_.validators().trusted(publicKey);
if (auto const& [added, relayed] =
app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_); app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_);
!added) !added)
{ {
@@ -1736,7 +1723,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
// receives within IDLED seconds since the message has been relayed. // receives within IDLED seconds since the message has been relayed.
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED) if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
overlay_.updateSlotAndSquelch( overlay_.updateSlotAndSquelch(
suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER); suppression, publicKey, id_, isTrusted);
// report duplicate proposal messages // report duplicate proposal messages
overlay_.reportInboundTraffic( overlay_.reportInboundTraffic(
@@ -1750,6 +1737,16 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
if (!isTrusted) if (!isTrusted)
{ {
overlay_.reportInboundTraffic(
TrafficCount::category::proposal_untrusted,
Message::messageSize(*m));
// If the operator has specified that untrusted proposals be dropped
// then this happens here I.e. before further wasting CPU verifying the
// signature of an untrusted key
if (app_.config().RELAY_UNTRUSTED_PROPOSALS == -1)
return;
if (tracking_.load() == Tracking::diverged) if (tracking_.load() == Tracking::diverged)
{ {
JLOG(p_journal_.debug()) JLOG(p_journal_.debug())
@@ -2358,20 +2355,6 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
auto const isTrusted = auto const isTrusted =
app_.validators().trusted(val->getSignerPublic()); app_.validators().trusted(val->getSignerPublic());
// If the operator has specified that untrusted validations be
// dropped then this happens here I.e. before further wasting CPU
// verifying the signature of an untrusted key
if (!isTrusted)
{
// increase untrusted validations received
overlay_.reportInboundTraffic(
TrafficCount::category::validation_untrusted,
Message::messageSize(*m));
if (app_.config().RELAY_UNTRUSTED_VALIDATIONS == -1)
return;
}
auto key = sha512Half(makeSlice(m->validation())); auto key = sha512Half(makeSlice(m->validation()));
auto [added, relayed] = auto [added, relayed] =
@@ -2384,7 +2367,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
// relayed. // relayed.
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED) if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
overlay_.updateSlotAndSquelch( overlay_.updateSlotAndSquelch(
key, val->getSignerPublic(), id_, protocol::mtVALIDATION); key, val->getSignerPublic(), id_, isTrusted);
// increase duplicate validations received // increase duplicate validations received
overlay_.reportInboundTraffic( overlay_.reportInboundTraffic(
@@ -2395,6 +2378,20 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
return; return;
} }
// at this point the message is guaranteed to be unique
if (!isTrusted)
{
overlay_.reportInboundTraffic(
TrafficCount::category::validation_untrusted,
Message::messageSize(*m));
// If the operator has specified that untrusted validations be
// dropped then this happens here I.e. before further wasting CPU
// verifying the signature of an untrusted key
if (app_.config().RELAY_UNTRUSTED_VALIDATIONS == -1)
return;
}
if (!isTrusted && (tracking_.load() == Tracking::diverged)) if (!isTrusted && (tracking_.load() == Tracking::diverged))
{ {
JLOG(p_journal_.debug()) JLOG(p_journal_.debug())
@@ -3008,7 +3005,7 @@ PeerImp::checkPropose(
peerPos.suppressionID(), peerPos.suppressionID(),
peerPos.publicKey(), peerPos.publicKey(),
std::move(haveMessage), std::move(haveMessage),
protocol::mtPROPOSE_LEDGER); isTrusted);
} }
} }
@@ -3044,7 +3041,7 @@ PeerImp::checkValidation(
key, key,
val->getSignerPublic(), val->getSignerPublic(),
std::move(haveMessage), std::move(haveMessage),
protocol::mtVALIDATION); val->isTrusted());
} }
} }
} }