fix: Replace charge() by fee_.update() in OnMessage functions (#5269)

In PeerImpl.cpp, if the function is a message handler (onMessage) or called directly from a message handler, then it should use fee_, since when the handler returns (OnMessageEnd) then the charge function is called. If the function is not a message handler, such as a job queue item, it should remain charge.
This commit is contained in:
Bart
2025-02-13 11:54:01 -05:00
committed by GitHub
parent e8e7888a23
commit 97e3dae6f4
3 changed files with 41 additions and 21 deletions

View File

@@ -57,6 +57,9 @@ public:
std::strong_ordering std::strong_ordering
operator<=>(Charge const&) const; operator<=>(Charge const&) const;
Charge
operator*(value_type m) const;
private: private:
value_type m_cost; value_type m_cost;
std::string m_label; std::string m_label;

View File

@@ -67,5 +67,11 @@ Charge::operator<=>(Charge const& c) const
return m_cost <=> c.m_cost; return m_cost <=> c.m_cost;
} }
Charge
Charge::operator*(value_type m) const
{
return Charge(m_cost * m, m_label);
}
} // namespace Resource } // namespace Resource
} // namespace ripple } // namespace ripple

View File

@@ -31,14 +31,11 @@
#include <xrpld/overlay/Cluster.h> #include <xrpld/overlay/Cluster.h>
#include <xrpld/overlay/detail/PeerImp.h> #include <xrpld/overlay/detail/PeerImp.h>
#include <xrpld/overlay/detail/Tuning.h> #include <xrpld/overlay/detail/Tuning.h>
#include <xrpld/overlay/predicates.h>
#include <xrpld/perflog/PerfLog.h> #include <xrpld/perflog/PerfLog.h>
#include <xrpl/basics/UptimeClock.h> #include <xrpl/basics/UptimeClock.h>
#include <xrpl/basics/base64.h> #include <xrpl/basics/base64.h>
#include <xrpl/basics/random.h> #include <xrpl/basics/random.h>
#include <xrpl/basics/safe_cast.h> #include <xrpl/basics/safe_cast.h>
#include <xrpl/beast/core/LexicalCast.h>
// #include <xrpl/beast/core/SemanticVersion.h>
#include <xrpl/protocol/digest.h> #include <xrpl/protocol/digest.h>
#include <boost/algorithm/string/predicate.hpp> #include <boost/algorithm/string/predicate.hpp>
@@ -1111,7 +1108,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMCluster> const& m)
// VFALCO NOTE I think we should drop the peer immediately // VFALCO NOTE I think we should drop the peer immediately
if (!cluster()) if (!cluster())
{ {
fee_.fee = Resource::feeUselessData; fee_.update(Resource::feeUselessData, "unknown cluster");
return; return;
} }
@@ -1189,13 +1186,14 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m)
// implication for the protocol. // implication for the protocol.
if (m->endpoints_v2().size() >= 1024) if (m->endpoints_v2().size() >= 1024)
{ {
charge(Resource::feeInvalidData, "endpoints too large"); fee_.update(Resource::feeUselessData, "endpoints too large");
return; return;
} }
std::vector<PeerFinder::Endpoint> endpoints; std::vector<PeerFinder::Endpoint> endpoints;
endpoints.reserve(m->endpoints_v2().size()); endpoints.reserve(m->endpoints_v2().size());
auto malformed = 0;
for (auto const& tm : m->endpoints_v2()) for (auto const& tm : m->endpoints_v2())
{ {
auto result = beast::IP::Endpoint::from_string_checked(tm.endpoint()); auto result = beast::IP::Endpoint::from_string_checked(tm.endpoint());
@@ -1204,7 +1202,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m)
{ {
JLOG(p_journal_.error()) << "failed to parse incoming endpoint: {" JLOG(p_journal_.error()) << "failed to parse incoming endpoint: {"
<< tm.endpoint() << "}"; << tm.endpoint() << "}";
charge(Resource::feeInvalidData, "endpoints malformed"); malformed++;
continue; continue;
} }
@@ -1220,6 +1218,15 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m)
endpoints.emplace_back(*result, tm.hops()); endpoints.emplace_back(*result, tm.hops());
} }
// Charge the peer for each malformed endpoint. As there still may be
// multiple valid endpoints we don't return early.
if (malformed > 0)
{
fee_.update(
Resource::feeInvalidData * malformed,
std::to_string(malformed) + " malformed endpoints");
}
if (!endpoints.empty()) if (!endpoints.empty())
overlay_.peerFinder().on_endpoints(slot_, endpoints); overlay_.peerFinder().on_endpoints(slot_, endpoints);
} }
@@ -1340,7 +1347,7 @@ void
PeerImp::onMessage(std::shared_ptr<protocol::TMGetLedger> const& m) PeerImp::onMessage(std::shared_ptr<protocol::TMGetLedger> const& m)
{ {
auto badData = [&](std::string const& msg) { auto badData = [&](std::string const& msg) {
charge(Resource::feeInvalidData, "get_ledger " + msg); fee_.update(Resource::feeInvalidData, "get_ledger " + msg);
JLOG(p_journal_.warn()) << "TMGetLedger: " << msg; JLOG(p_journal_.warn()) << "TMGetLedger: " << msg;
}; };
auto const itype{m->itype()}; auto const itype{m->itype()};
@@ -1431,7 +1438,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m)
JLOG(p_journal_.trace()) << "onMessage, TMProofPathRequest"; JLOG(p_journal_.trace()) << "onMessage, TMProofPathRequest";
if (!ledgerReplayEnabled_) if (!ledgerReplayEnabled_)
{ {
charge(Resource::feeMalformedRequest, "proof_path_request disabled"); fee_.update(
Resource::feeMalformedRequest, "proof_path_request disabled");
return; return;
} }
@@ -1468,13 +1476,14 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathResponse> const& m)
{ {
if (!ledgerReplayEnabled_) if (!ledgerReplayEnabled_)
{ {
charge(Resource::feeMalformedRequest, "proof_path_response disabled"); fee_.update(
Resource::feeMalformedRequest, "proof_path_response disabled");
return; return;
} }
if (!ledgerReplayMsgHandler_.processProofPathResponse(m)) if (!ledgerReplayMsgHandler_.processProofPathResponse(m))
{ {
charge(Resource::feeInvalidData, "proof_path_response"); fee_.update(Resource::feeInvalidData, "proof_path_response");
} }
} }
@@ -1484,7 +1493,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m)
JLOG(p_journal_.trace()) << "onMessage, TMReplayDeltaRequest"; JLOG(p_journal_.trace()) << "onMessage, TMReplayDeltaRequest";
if (!ledgerReplayEnabled_) if (!ledgerReplayEnabled_)
{ {
charge(Resource::feeMalformedRequest, "replay_delta_request disabled"); fee_.update(
Resource::feeMalformedRequest, "replay_delta_request disabled");
return; return;
} }
@@ -1521,13 +1531,14 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m)
{ {
if (!ledgerReplayEnabled_) if (!ledgerReplayEnabled_)
{ {
charge(Resource::feeMalformedRequest, "replay_delta_response disabled"); fee_.update(
Resource::feeMalformedRequest, "replay_delta_response disabled");
return; return;
} }
if (!ledgerReplayMsgHandler_.processReplayDeltaResponse(m)) if (!ledgerReplayMsgHandler_.processReplayDeltaResponse(m))
{ {
charge(Resource::feeInvalidData, "replay_delta_response"); fee_.update(Resource::feeInvalidData, "replay_delta_response");
} }
} }
@@ -2408,10 +2419,6 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
return; return;
} }
fee_.update(
Resource::feeModerateBurdenPeer,
" received a get object by hash request");
protocol::TMGetObjectByHash reply; protocol::TMGetObjectByHash reply;
reply.set_query(false); reply.set_query(false);
@@ -2432,6 +2439,10 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
reply.set_ledgerhash(packet.ledgerhash()); reply.set_ledgerhash(packet.ledgerhash());
} }
fee_.update(
Resource::feeModerateBurdenPeer,
" received a get object by hash request");
// This is a very minimal implementation // This is a very minimal implementation
for (int i = 0; i < packet.objects_size(); ++i) for (int i = 0; i < packet.objects_size(); ++i)
{ {
@@ -2628,14 +2639,14 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
if (!m->has_validatorpubkey()) if (!m->has_validatorpubkey())
{ {
charge(Resource::feeInvalidData, "squelch no pubkey"); fee_.update(Resource::feeInvalidData, "squelch no pubkey");
return; return;
} }
auto validator = m->validatorpubkey(); auto validator = m->validatorpubkey();
auto const slice{makeSlice(validator)}; auto const slice{makeSlice(validator)};
if (!publicKeyType(slice)) if (!publicKeyType(slice))
{ {
charge(Resource::feeInvalidData, "squelch bad pubkey"); fee_.update(Resource::feeInvalidData, "squelch bad pubkey");
return; return;
} }
PublicKey key(slice); PublicKey key(slice);
@@ -2643,7 +2654,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
// Ignore non-validator squelch // Ignore non-validator squelch
if (!app_.validators().listed(key)) if (!app_.validators().listed(key))
{ {
charge(Resource::feeInvalidData, "squelch non-validator"); fee_.update(Resource::feeInvalidData, "squelch non-validator");
JLOG(p_journal_.debug()) JLOG(p_journal_.debug())
<< "onMessage: TMSquelch discarding non-validator squelch " << "onMessage: TMSquelch discarding non-validator squelch "
<< slice; << slice;
@@ -2663,7 +2674,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
if (!m->squelch()) if (!m->squelch())
squelch_.removeSquelch(key); squelch_.removeSquelch(key);
else if (!squelch_.addSquelch(key, std::chrono::seconds{duration})) else if (!squelch_.addSquelch(key, std::chrono::seconds{duration}))
charge(Resource::feeInvalidData, "squelch duration"); fee_.update(Resource::feeInvalidData, "squelch duration");
JLOG(p_journal_.debug()) JLOG(p_journal_.debug())
<< "onMessage: TMSquelch " << slice << " " << id() << " " << duration; << "onMessage: TMSquelch " << slice << " " << id() << " " << duration;