Merge remote-tracking branch 'upstream/master' into merge231

Hotfix: version 2.3.1
  Reduce the peer charges for well-behaved peers
  Update conan in the "nix" CI jobs
This commit is contained in:
Ed Hennis
2025-01-29 18:11:02 -05:00
32 changed files with 400 additions and 162 deletions

View File

@@ -61,10 +61,10 @@ Charge::operator==(Charge const& c) const
return c.m_cost == m_cost;
}
bool
Charge::operator!=(Charge const& c) const
std::strong_ordering
Charge::operator<=>(Charge const& c) const
{
return c.m_cost != m_cost;
return m_cost <=> c.m_cost;
}
} // namespace Resource

View File

@@ -96,12 +96,12 @@ Consumer::disposition() const
}
Disposition
Consumer::charge(Charge const& what)
Consumer::charge(Charge const& what, std::string const& context)
{
Disposition d = ok;
if (m_logic && m_entry && !m_entry->isUnlimited())
d = m_logic->charge(*m_entry, what);
d = m_logic->charge(*m_entry, what, context);
return d;
}

View File

@@ -22,24 +22,26 @@
namespace ripple {
namespace Resource {
Charge const feeInvalidRequest(100, "malformed request");
Charge const feeMalformedRequest(200, "malformed request");
Charge const feeRequestNoReply(10, "unsatisfiable request");
Charge const feeInvalidSignature(1000, "invalid signature");
Charge const feeUnwantedData(150, "useless data");
Charge const feeBadData(200, "invalid data");
Charge const feeInvalidSignature(2000, "invalid signature");
Charge const feeUselessData(150, "useless data");
Charge const feeInvalidData(400, "invalid data");
Charge const feeInvalidRPC(100, "malformed RPC");
Charge const feeMalformedRPC(100, "malformed RPC");
Charge const feeReferenceRPC(20, "reference RPC");
Charge const feeExceptionRPC(100, "exceptioned RPC");
Charge const feeMediumBurdenRPC(400, "medium RPC");
Charge const feeHighBurdenRPC(3000, "heavy RPC");
Charge const feeHeavyBurdenRPC(3000, "heavy RPC");
Charge const feeLightPeer(1, "trivial peer request");
Charge const feeMediumBurdenPeer(250, "moderate peer request");
Charge const feeHighBurdenPeer(2000, "heavy peer request");
Charge const feeTrivialPeer(1, "trivial peer request");
Charge const feeModerateBurdenPeer(250, "moderate peer request");
Charge const feeHeavyBurdenPeer(2000, "heavy peer request");
Charge const feeWarning(2000, "received warning");
Charge const feeDrop(3000, "dropped");
Charge const feeWarning(4000, "received warning");
Charge const feeDrop(6000, "dropped");
// See also Resource::Logic::charge for log level cutoff values
} // namespace Resource
} // namespace ripple

View File

@@ -221,7 +221,8 @@ public:
return {};
}
void
charge(Resource::Charge const& fee) override
charge(Resource::Charge const& fee, std::string const& context = {})
override
{
}
id_t

View File

@@ -88,7 +88,8 @@ public:
return {};
}
void
charge(Resource::Charge const& fee) override
charge(Resource::Charge const& fee, std::string const& context = {})
override
{
}
bool

View File

@@ -16,6 +16,7 @@
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <test/jtx.h>
#include <test/jtx/Env.h>
#include <xrpld/overlay/detail/OverlayImpl.h>
#include <xrpld/overlay/detail/PeerImp.h>
@@ -222,14 +223,21 @@ private:
rid_ = 0;
for (int i = 0; i < nPeers; i++)
addPeer(env, peers, nDisabled);
protocol::TMTransaction m;
m.set_rawtransaction("transaction");
m.set_deferred(false);
m.set_status(protocol::TransactionStatus::tsNEW);
env.app().overlay().relay(uint256{0}, m, toSkip);
BEAST_EXPECT(
PeerTest::sendTx_ == expectRelay &&
PeerTest::queueTx_ == expectQueue);
auto const jtx = env.jt(noop(env.master));
if (BEAST_EXPECT(jtx.stx))
{
protocol::TMTransaction m;
Serializer s;
jtx.stx->add(s);
m.set_rawtransaction(s.data(), s.size());
m.set_deferred(false);
m.set_status(protocol::TransactionStatus::tsNEW);
env.app().overlay().relay(uint256{0}, m, toSkip);
BEAST_EXPECT(
PeerTest::sendTx_ == expectRelay &&
PeerTest::queueTx_ == expectQueue);
}
}
void

View File

@@ -1074,7 +1074,8 @@ InboundLedger::processData(
if (packet.nodes().empty())
{
JLOG(journal_.warn()) << peer->id() << ": empty header data";
peer->charge(Resource::feeInvalidRequest);
peer->charge(
Resource::feeMalformedRequest, "ledger_data empty header");
return -1;
}
@@ -1089,7 +1090,9 @@ InboundLedger::processData(
if (!takeHeader(packet.nodes(0).nodedata()))
{
JLOG(journal_.warn()) << "Got invalid header data";
peer->charge(Resource::feeInvalidRequest);
peer->charge(
Resource::feeMalformedRequest,
"ledger_data invalid header");
return -1;
}
@@ -1112,7 +1115,8 @@ InboundLedger::processData(
{
JLOG(journal_.warn())
<< "Included AS/TX root invalid: " << ex.what();
peer->charge(Resource::feeBadData);
using namespace std::string_literals;
peer->charge(Resource::feeInvalidData, "ledger_data "s + ex.what());
return -1;
}
@@ -1132,7 +1136,7 @@ InboundLedger::processData(
if (packet.nodes().empty())
{
JLOG(journal_.info()) << peer->id() << ": response with no nodes";
peer->charge(Resource::feeInvalidRequest);
peer->charge(Resource::feeMalformedRequest, "ledger_data no nodes");
return -1;
}
@@ -1144,7 +1148,8 @@ InboundLedger::processData(
if (!node.has_nodeid() || !node.has_nodedata())
{
JLOG(journal_.warn()) << "Got bad node";
peer->charge(Resource::feeInvalidRequest);
peer->charge(
Resource::feeMalformedRequest, "ledger_data bad node");
return -1;
}
}

View File

@@ -146,7 +146,7 @@ public:
if (ta == nullptr)
{
peer->charge(Resource::feeUnwantedData);
peer->charge(Resource::feeUselessData, "ledger_data");
return;
}
@@ -157,7 +157,7 @@ public:
{
if (!node.has_nodeid() || !node.has_nodedata())
{
peer->charge(Resource::feeInvalidRequest);
peer->charge(Resource::feeMalformedRequest, "ledger_data");
return;
}
@@ -165,7 +165,7 @@ public:
if (!id)
{
peer->charge(Resource::feeBadData);
peer->charge(Resource::feeInvalidData, "ledger_data");
return;
}
@@ -173,7 +173,7 @@ public:
}
if (!ta->takeNodes(data, peer).isUseful())
peer->charge(Resource::feeUnwantedData);
peer->charge(Resource::feeUselessData, "ledger_data not useful");
}
void

View File

@@ -2135,7 +2135,7 @@ LedgerMaster::makeFetchPack(
{
JLOG(m_journal.info())
<< "Peer requests fetch pack for ledger we don't have: " << have;
peer->charge(Resource::feeRequestNoReply);
peer->charge(Resource::feeRequestNoReply, "get_object ledger");
return;
}
@@ -2143,14 +2143,14 @@ LedgerMaster::makeFetchPack(
{
JLOG(m_journal.warn())
<< "Peer requests fetch pack from open ledger: " << have;
peer->charge(Resource::feeInvalidRequest);
peer->charge(Resource::feeMalformedRequest, "get_object ledger open");
return;
}
if (have->info().seq < getEarliestFetch())
{
JLOG(m_journal.debug()) << "Peer requests fetch pack that is too early";
peer->charge(Resource::feeInvalidRequest);
peer->charge(Resource::feeMalformedRequest, "get_object ledger early");
return;
}
@@ -2161,7 +2161,8 @@ LedgerMaster::makeFetchPack(
JLOG(m_journal.info())
<< "Peer requests fetch pack for ledger whose predecessor we "
<< "don't have: " << have;
peer->charge(Resource::feeRequestNoReply);
peer->charge(
Resource::feeRequestNoReply, "get_object ledger no parent");
return;
}

View File

@@ -183,7 +183,7 @@ public:
virtual void
relay(
uint256 const& hash,
protocol::TMTransaction& m,
std::optional<std::reference_wrapper<protocol::TMTransaction>> m,
std::set<Peer::id_t> const& toSkip) = 0;
/** Visit every active peer.

View File

@@ -77,7 +77,7 @@ public:
/** Adjust this peer's load balance based on the type of load imposed. */
virtual void
charge(Resource::Charge const& fee) = 0;
charge(Resource::Charge const& fee, std::string const& context) = 0;
//
// Identity

View File

@@ -35,6 +35,7 @@
#include <xrpl/basics/make_SSLContext.h>
#include <xrpl/basics/random.h>
#include <xrpl/beast/core/LexicalCast.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/server/SimpleWriter.h>
#include <boost/algorithm/string/predicate.hpp>
@@ -1211,17 +1212,39 @@ OverlayImpl::getManifestsMessage()
void
OverlayImpl::relay(
uint256 const& hash,
protocol::TMTransaction& m,
std::optional<std::reference_wrapper<protocol::TMTransaction>> tx,
std::set<Peer::id_t> const& toSkip)
{
auto const sm = std::make_shared<Message>(m, protocol::mtTRANSACTION);
bool relay = tx.has_value();
if (relay)
{
auto& txn = tx->get();
SerialIter sit(makeSlice(txn.rawtransaction()));
relay = !isPseudoTx(STTx{sit});
}
Overlay::PeerSequence peers = {};
std::size_t total = 0;
std::size_t disabled = 0;
std::size_t enabledInSkip = 0;
// total peers excluding peers in toSkip
auto peers = getActivePeers(toSkip, total, disabled, enabledInSkip);
auto minRelay = app_.config().TX_REDUCE_RELAY_MIN_PEERS + disabled;
if (!relay)
{
if (!app_.config().TX_REDUCE_RELAY_ENABLE)
return;
peers = getActivePeers(toSkip, total, disabled, enabledInSkip);
JLOG(journal_.trace())
<< "not relaying tx, total peers " << peers.size();
for (auto const& p : peers)
p->addTxQueue(hash);
return;
}
auto& txn = tx->get();
auto const sm = std::make_shared<Message>(txn, protocol::mtTRANSACTION);
peers = getActivePeers(toSkip, total, disabled, enabledInSkip);
auto const minRelay = app_.config().TX_REDUCE_RELAY_MIN_PEERS + disabled;
if (!app_.config().TX_REDUCE_RELAY_ENABLE || total <= minRelay)
{
@@ -1236,7 +1259,7 @@ OverlayImpl::relay(
// We have more peers than the minimum (disabled + minimum enabled),
// relay to all disabled and some randomly selected enabled that
// do not have the transaction.
auto enabledTarget = app_.config().TX_REDUCE_RELAY_MIN_PEERS +
auto const enabledTarget = app_.config().TX_REDUCE_RELAY_MIN_PEERS +
(total - minRelay) * app_.config().TX_RELAY_PERCENTAGE / 100;
txMetrics_.addMetrics(enabledTarget, toSkip.size(), disabled);

View File

@@ -238,7 +238,7 @@ public:
void
relay(
uint256 const&,
protocol::TMTransaction& m,
std::optional<std::reference_wrapper<protocol::TMTransaction>> m,
std::set<Peer::id_t> const& skip) override;
std::shared_ptr<Message>

View File

@@ -62,6 +62,9 @@ std::chrono::milliseconds constexpr peerHighLatency{300};
std::chrono::seconds constexpr peerTimerInterval{60};
} // namespace
// TODO: Remove this exclusion once unit tests are added after the hotfix
// release.
PeerImp::PeerImp(
Application& app,
id_t id,
@@ -95,7 +98,7 @@ PeerImp::PeerImp(
, creationTime_(clock_type::now())
, squelch_(app_.journal("Squelch"))
, usage_(consumer)
, fee_(Resource::feeLightPeer)
, fee_{Resource::feeTrivialPeer, ""}
, slot_(slot)
, request_(std::move(request))
, headers_(request_)
@@ -339,9 +342,9 @@ PeerImp::removeTxQueue(uint256 const& hash)
}
void
PeerImp::charge(Resource::Charge const& fee)
PeerImp::charge(Resource::Charge const& fee, std::string const& context)
{
if ((usage_.charge(fee) == Resource::drop) &&
if ((usage_.charge(fee, context) == Resource::drop) &&
usage_.disconnect(p_journal_) && strand_.running_in_this_thread())
{
// Sever the connection
@@ -1010,9 +1013,9 @@ PeerImp::onMessageBegin(
std::size_t uncompressed_size,
bool isCompressed)
{
load_event_ =
app_.getJobQueue().makeLoadEvent(jtPEER, protocolMessageName(type));
fee_ = Resource::feeLightPeer;
auto const name = protocolMessageName(type);
load_event_ = app_.getJobQueue().makeLoadEvent(jtPEER, name);
fee_ = {Resource::feeTrivialPeer, name};
auto const category = TrafficCount::categorize(*m, type, true);
overlay_.reportTraffic(category, true, static_cast<int>(size));
using namespace protocol;
@@ -1042,7 +1045,7 @@ PeerImp::onMessageEnd(
std::shared_ptr<::google::protobuf::Message> const&)
{
load_event_.reset();
charge(fee_);
charge(fee_.fee, fee_.context);
}
void
@@ -1052,12 +1055,12 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMManifests> const& m)
if (s == 0)
{
fee_ = Resource::feeUnwantedData;
fee_.update(Resource::feeUselessData, "empty");
return;
}
if (s > 100)
fee_ = Resource::feeMediumBurdenPeer;
fee_.update(Resource::feeModerateBurdenPeer, "oversize");
app_.getJobQueue().addJob(
jtMANIFEST, "receiveManifests", [this, that = shared_from_this(), m]() {
@@ -1071,7 +1074,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMPing> const& m)
if (m->type() == protocol::TMPing::ptPING)
{
// We have received a ping request, reply with a pong
fee_ = Resource::feeMediumBurdenPeer;
fee_.update(Resource::feeModerateBurdenPeer, "ping request");
m->set_type(protocol::TMPing::ptPONG);
send(std::make_shared<Message>(*m, protocol::mtPING));
return;
@@ -1108,7 +1111,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMCluster> const& m)
// VFALCO NOTE I think we should drop the peer immediately
if (!cluster())
{
fee_ = Resource::feeUnwantedData;
fee_.fee = Resource::feeUselessData;
return;
}
@@ -1186,7 +1189,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m)
// implication for the protocol.
if (m->endpoints_v2().size() >= 1024)
{
charge(Resource::feeBadData);
charge(Resource::feeInvalidData, "endpoints too large");
return;
}
@@ -1201,7 +1204,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m)
{
JLOG(p_journal_.error()) << "failed to parse incoming endpoint: {"
<< tm.endpoint() << "}";
charge(Resource::feeBadData);
charge(Resource::feeInvalidData, "endpoints malformed");
continue;
}
@@ -1224,14 +1227,18 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m)
void
PeerImp::onMessage(std::shared_ptr<protocol::TMTransaction> const& m)
{
handleTransaction(m, true);
handleTransaction(m, true, false);
}
void
PeerImp::handleTransaction(
std::shared_ptr<protocol::TMTransaction> const& m,
bool eraseTxQueue)
bool eraseTxQueue,
bool batch)
{
XRPL_ASSERT(
eraseTxQueue != batch,
("ripple::PeerImp::handleTransaction correct function params"));
if (tracking_.load() == Tracking::diverged)
return;
@@ -1259,7 +1266,7 @@ PeerImp::handleTransaction(
// we have seen this transaction recently
if (flags & SF_BAD)
{
fee_ = Resource::feeInvalidSignature;
fee_.update(Resource::feeUselessData, "known bad");
JLOG(p_journal_.debug()) << "Ignoring known bad tx " << txID;
}
@@ -1313,9 +1320,11 @@ PeerImp::handleTransaction(
[weak = std::weak_ptr<PeerImp>(shared_from_this()),
flags,
checkSignature,
batch,
stx]() {
if (auto peer = weak.lock())
peer->checkTransaction(flags, checkSignature, stx);
peer->checkTransaction(
flags, checkSignature, stx, batch);
});
}
}
@@ -1331,7 +1340,7 @@ void
PeerImp::onMessage(std::shared_ptr<protocol::TMGetLedger> const& m)
{
auto badData = [&](std::string const& msg) {
charge(Resource::feeBadData);
charge(Resource::feeInvalidData, "get_ledger " + msg);
JLOG(p_journal_.warn()) << "TMGetLedger: " << msg;
};
auto const itype{m->itype()};
@@ -1422,11 +1431,12 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m)
JLOG(p_journal_.trace()) << "onMessage, TMProofPathRequest";
if (!ledgerReplayEnabled_)
{
charge(Resource::feeInvalidRequest);
charge(Resource::feeMalformedRequest, "proof_path_request disabled");
return;
}
fee_ = Resource::feeMediumBurdenPeer;
fee_.update(
Resource::feeModerateBurdenPeer, "received a proof path request");
std::weak_ptr<PeerImp> weak = shared_from_this();
app_.getJobQueue().addJob(
jtREPLAY_REQ, "recvProofPathRequest", [weak, m]() {
@@ -1437,9 +1447,12 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m)
if (reply.has_error())
{
if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
peer->charge(Resource::feeInvalidRequest);
peer->charge(
Resource::feeMalformedRequest,
"proof_path_request");
else
peer->charge(Resource::feeRequestNoReply);
peer->charge(
Resource::feeRequestNoReply, "proof_path_request");
}
else
{
@@ -1455,13 +1468,13 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathResponse> const& m)
{
if (!ledgerReplayEnabled_)
{
charge(Resource::feeInvalidRequest);
charge(Resource::feeMalformedRequest, "proof_path_response disabled");
return;
}
if (!ledgerReplayMsgHandler_.processProofPathResponse(m))
{
charge(Resource::feeBadData);
charge(Resource::feeInvalidData, "proof_path_response");
}
}
@@ -1471,11 +1484,11 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m)
JLOG(p_journal_.trace()) << "onMessage, TMReplayDeltaRequest";
if (!ledgerReplayEnabled_)
{
charge(Resource::feeInvalidRequest);
charge(Resource::feeMalformedRequest, "replay_delta_request disabled");
return;
}
fee_ = Resource::feeMediumBurdenPeer;
fee_.fee = Resource::feeModerateBurdenPeer;
std::weak_ptr<PeerImp> weak = shared_from_this();
app_.getJobQueue().addJob(
jtREPLAY_REQ, "recvReplayDeltaRequest", [weak, m]() {
@@ -1486,9 +1499,13 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m)
if (reply.has_error())
{
if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
peer->charge(Resource::feeInvalidRequest);
peer->charge(
Resource::feeMalformedRequest,
"replay_delta_request");
else
peer->charge(Resource::feeRequestNoReply);
peer->charge(
Resource::feeRequestNoReply,
"replay_delta_request");
}
else
{
@@ -1504,13 +1521,13 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m)
{
if (!ledgerReplayEnabled_)
{
charge(Resource::feeInvalidRequest);
charge(Resource::feeMalformedRequest, "replay_delta_response disabled");
return;
}
if (!ledgerReplayMsgHandler_.processReplayDeltaResponse(m))
{
charge(Resource::feeBadData);
charge(Resource::feeInvalidData, "replay_delta_response");
}
}
@@ -1518,7 +1535,7 @@ void
PeerImp::onMessage(std::shared_ptr<protocol::TMLedgerData> const& m)
{
auto badData = [&](std::string const& msg) {
fee_ = Resource::feeBadData;
fee_.update(Resource::feeInvalidData, msg);
JLOG(p_journal_.warn()) << "TMLedgerData: " << msg;
};
@@ -1618,7 +1635,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
(publicKeyType(makeSlice(set.nodepubkey())) != KeyType::secp256k1))
{
JLOG(p_journal_.warn()) << "Proposal: malformed";
fee_ = Resource::feeInvalidSignature;
fee_.update(
Resource::feeInvalidSignature,
" signature can't be longer than 72 bytes");
return;
}
@@ -1626,7 +1645,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
!stringIsUint256Sized(set.previousledger()))
{
JLOG(p_journal_.warn()) << "Proposal: malformed";
fee_ = Resource::feeInvalidRequest;
fee_.update(Resource::feeMalformedRequest, "bad hashes");
return;
}
@@ -1931,7 +1950,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMHaveTransactionSet> const& m)
{
if (!stringIsUint256Sized(m->hash()))
{
fee_ = Resource::feeInvalidRequest;
fee_.update(Resource::feeMalformedRequest, "bad hash");
return;
}
@@ -1944,7 +1963,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMHaveTransactionSet> const& m)
if (std::find(recentTxSets_.begin(), recentTxSets_.end(), hash) !=
recentTxSets_.end())
{
fee_ = Resource::feeUnwantedData;
fee_.update(Resource::feeUselessData, "duplicate (tsHAVE)");
return;
}
@@ -1966,7 +1985,7 @@ PeerImp::onValidatorListMessage(
JLOG(p_journal_.warn()) << "Ignored malformed " << messageType
<< " from peer " << remote_address_;
// This shouldn't ever happen with a well-behaved peer
fee_ = Resource::feeHighBurdenPeer;
fee_.update(Resource::feeHeavyBurdenPeer, "no blobs");
return;
}
@@ -1983,7 +2002,7 @@ PeerImp::onValidatorListMessage(
// Charging this fee here won't hurt the peer in the normal
// course of operation (ie. refresh every 5 minutes), but
// will add up if the peer is misbehaving.
fee_ = Resource::feeUnwantedData;
fee_.update(Resource::feeUselessData, "duplicate");
return;
}
@@ -2073,27 +2092,30 @@ PeerImp::onValidatorListMessage(
// Charging this fee here won't hurt the peer in the normal
// course of operation (ie. refresh every 5 minutes), but
// will add up if the peer is misbehaving.
fee_ = Resource::feeUnwantedData;
fee_.update(
Resource::feeUselessData,
" duplicate (same_sequence or known_sequence)");
break;
case ListDisposition::stale:
// There are very few good reasons for a peer to send an
// old list, particularly more than once.
fee_ = Resource::feeBadData;
fee_.update(Resource::feeInvalidData, "expired");
break;
case ListDisposition::untrusted:
// Charging this fee here won't hurt the peer in the normal
// course of operation (ie. refresh every 5 minutes), but
// will add up if the peer is misbehaving.
fee_ = Resource::feeUnwantedData;
fee_.update(Resource::feeUselessData, "untrusted");
break;
case ListDisposition::invalid:
// This shouldn't ever happen with a well-behaved peer
fee_ = Resource::feeInvalidSignature;
fee_.update(
Resource::feeInvalidSignature, "invalid list disposition");
break;
case ListDisposition::unsupported_version:
// During a version transition, this may be legitimate.
// If it happens frequently, that's probably bad.
fee_ = Resource::feeBadData;
fee_.update(Resource::feeInvalidData, "version");
break;
default:
UNREACHABLE(
@@ -2174,7 +2196,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidatorList> const& m)
<< "ValidatorList: received validator list from peer using "
<< "protocol version " << to_string(protocol_)
<< " which shouldn't support this feature.";
fee_ = Resource::feeUnwantedData;
fee_.update(Resource::feeUselessData, "unsupported peer");
return;
}
onValidatorListMessage(
@@ -2187,7 +2209,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidatorList> const& m)
{
JLOG(p_journal_.warn()) << "ValidatorList: Exception, " << e.what()
<< " from peer " << remote_address_;
fee_ = Resource::feeBadData;
using namespace std::string_literals;
fee_.update(Resource::feeInvalidData, e.what());
}
}
@@ -2203,7 +2226,7 @@ PeerImp::onMessage(
<< "ValidatorListCollection: received validator list from peer "
<< "using protocol version " << to_string(protocol_)
<< " which shouldn't support this feature.";
fee_ = Resource::feeUnwantedData;
fee_.update(Resource::feeUselessData, "unsupported peer");
return;
}
else if (m->version() < 2)
@@ -2213,7 +2236,7 @@ PeerImp::onMessage(
"version "
<< m->version() << " from peer using protocol version "
<< to_string(protocol_);
fee_ = Resource::feeBadData;
fee_.update(Resource::feeInvalidData, "wrong version");
return;
}
onValidatorListMessage(
@@ -2226,7 +2249,8 @@ PeerImp::onMessage(
{
JLOG(p_journal_.warn()) << "ValidatorListCollection: Exception, "
<< e.what() << " from peer " << remote_address_;
fee_ = Resource::feeBadData;
using namespace std::string_literals;
fee_.update(Resource::feeInvalidData, e.what());
}
}
@@ -2236,7 +2260,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
if (m->validation().size() < 50)
{
JLOG(p_journal_.warn()) << "Validation: Too small";
fee_ = Resource::feeInvalidRequest;
fee_.update(Resource::feeMalformedRequest, "too small");
return;
}
@@ -2264,7 +2288,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
val->getSeenTime()))
{
JLOG(p_journal_.trace()) << "Validation: Not current";
fee_ = Resource::feeUnwantedData;
fee_.update(Resource::feeUselessData, "not current");
return;
}
@@ -2337,7 +2361,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
{
JLOG(p_journal_.warn())
<< "Exception processing validation: " << e.what();
fee_ = Resource::feeInvalidRequest;
using namespace std::string_literals;
fee_.update(Resource::feeMalformedRequest, e.what());
}
}
@@ -2370,7 +2395,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
{
JLOG(p_journal_.error())
<< "TMGetObjectByHash: tx reduce-relay is disabled";
fee_ = Resource::feeInvalidRequest;
fee_.update(Resource::feeMalformedRequest, "disabled");
return;
}
@@ -2383,7 +2408,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
return;
}
fee_ = Resource::feeMediumBurdenPeer;
fee_.update(
Resource::feeModerateBurdenPeer,
" received a get object by hash request");
protocol::TMGetObjectByHash reply;
@@ -2398,7 +2425,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
{
if (!stringIsUint256Sized(packet.ledgerhash()))
{
fee_ = Resource::feeInvalidRequest;
fee_.update(Resource::feeMalformedRequest, "ledger hash");
return;
}
@@ -2502,7 +2529,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMHaveTransactions> const& m)
{
JLOG(p_journal_.error())
<< "TMHaveTransactions: tx reduce-relay is disabled";
fee_ = Resource::feeInvalidRequest;
fee_.update(Resource::feeMalformedRequest, "disabled");
return;
}
@@ -2531,7 +2558,7 @@ PeerImp::handleHaveTransactions(
{
JLOG(p_journal_.error())
<< "TMHaveTransactions with invalid hash size";
fee_ = Resource::feeInvalidRequest;
fee_.update(Resource::feeMalformedRequest, "hash size");
return;
}
@@ -2571,7 +2598,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMTransactions> const& m)
{
JLOG(p_journal_.error())
<< "TMTransactions: tx reduce-relay is disabled";
fee_ = Resource::feeInvalidRequest;
fee_.update(Resource::feeMalformedRequest, "disabled");
return;
}
@@ -2584,7 +2611,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMTransactions> const& m)
handleTransaction(
std::shared_ptr<protocol::TMTransaction>(
m->mutable_transactions(i), [](protocol::TMTransaction*) {}),
false);
false,
true);
}
void
@@ -2600,14 +2628,14 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
if (!m->has_validatorpubkey())
{
charge(Resource::feeBadData);
charge(Resource::feeInvalidData, "squelch no pubkey");
return;
}
auto validator = m->validatorpubkey();
auto const slice{makeSlice(validator)};
if (!publicKeyType(slice))
{
charge(Resource::feeBadData);
charge(Resource::feeInvalidData, "squelch bad pubkey");
return;
}
PublicKey key(slice);
@@ -2615,7 +2643,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
// Ignore non-validator squelch
if (!app_.validators().listed(key))
{
charge(Resource::feeBadData);
charge(Resource::feeInvalidData, "squelch non-validator");
JLOG(p_journal_.debug())
<< "onMessage: TMSquelch discarding non-validator squelch "
<< slice;
@@ -2635,7 +2663,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
if (!m->squelch())
squelch_.removeSquelch(key);
else if (!squelch_.addSquelch(key, std::chrono::seconds{duration}))
charge(Resource::feeBadData);
charge(Resource::feeInvalidData, "squelch duration");
JLOG(p_journal_.debug())
<< "onMessage: TMSquelch " << slice << " " << id() << " " << duration;
@@ -2676,11 +2704,11 @@ PeerImp::doFetchPack(const std::shared_ptr<protocol::TMGetObjectByHash>& packet)
if (!stringIsUint256Sized(packet->ledgerhash()))
{
JLOG(p_journal_.warn()) << "FetchPack hash size malformed";
fee_ = Resource::feeInvalidRequest;
fee_.update(Resource::feeMalformedRequest, "hash size");
return;
}
fee_ = Resource::feeHighBurdenPeer;
fee_.fee = Resource::feeHeavyBurdenPeer;
uint256 const hash{packet->ledgerhash()};
@@ -2705,7 +2733,7 @@ PeerImp::doTransactions(
if (packet->objects_size() > reduce_relay::MAX_TX_QUEUE_SIZE)
{
JLOG(p_journal_.error()) << "doTransactions, invalid number of hashes";
fee_ = Resource::feeInvalidRequest;
fee_.update(Resource::feeMalformedRequest, "too big");
return;
}
@@ -2715,7 +2743,7 @@ PeerImp::doTransactions(
if (!stringIsUint256Sized(obj.hash()))
{
fee_ = Resource::feeInvalidRequest;
fee_.update(Resource::feeMalformedRequest, "hash size");
return;
}
@@ -2727,7 +2755,7 @@ PeerImp::doTransactions(
{
JLOG(p_journal_.error()) << "doTransactions, transaction not found "
<< Slice(hash.data(), hash.size());
fee_ = Resource::feeInvalidRequest;
fee_.update(Resource::feeMalformedRequest, "tx not found");
return;
}
@@ -2752,7 +2780,8 @@ void
PeerImp::checkTransaction(
int flags,
bool checkSignature,
std::shared_ptr<STTx const> const& stx)
std::shared_ptr<STTx const> const& stx,
bool batch)
{
// VFALCO TODO Rewrite to not use exceptions
try
@@ -2763,10 +2792,48 @@ PeerImp::checkTransaction(
app_.getLedgerMaster().getValidLedgerIndex()))
{
app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
charge(Resource::feeUnwantedData);
charge(Resource::feeUselessData, "expired tx");
return;
}
if (isPseudoTx(*stx))
{
// Don't do anything with pseudo transactions except put them in the
// TransactionMaster cache
std::string reason;
auto tx = std::make_shared<Transaction>(stx, reason, app_);
XRPL_ASSERT(
tx->getStatus() == NEW,
"ripple::PeerImp::checkTransaction Transaction created "
"correctly");
if (tx->getStatus() == NEW)
{
JLOG(p_journal_.debug())
<< "Processing " << (batch ? "batch" : "unsolicited")
<< " pseudo-transaction tx " << tx->getID();
app_.getMasterTransaction().canonicalize(&tx);
// Tell the overlay about it, but don't relay it.
auto const toSkip =
app_.getHashRouter().shouldRelay(tx->getID());
if (toSkip)
{
JLOG(p_journal_.debug())
<< "Passing skipped pseudo pseudo-transaction tx "
<< tx->getID();
app_.overlay().relay(tx->getID(), {}, *toSkip);
}
if (!batch)
{
JLOG(p_journal_.debug())
<< "Charging for pseudo-transaction tx " << tx->getID();
charge(Resource::feeUselessData, "pseudo tx");
}
return;
}
}
if (checkSignature)
{
// Check the signature before handing off to the job queue.
@@ -2785,7 +2852,9 @@ PeerImp::checkTransaction(
// Probably not necessary to set SF_BAD, but doesn't hurt.
app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
charge(Resource::feeInvalidSignature);
charge(
Resource::feeInvalidSignature,
"check transaction signature failure");
return;
}
}
@@ -2806,7 +2875,7 @@ PeerImp::checkTransaction(
<< "Exception checking transaction: " << reason;
}
app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
charge(Resource::feeInvalidSignature);
charge(Resource::feeInvalidSignature, "tx (impossible)");
return;
}
@@ -2819,7 +2888,8 @@ PeerImp::checkTransaction(
JLOG(p_journal_.warn())
<< "Exception in " << __func__ << ": " << ex.what();
app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
charge(Resource::feeBadData);
using namespace std::string_literals;
charge(Resource::feeInvalidData, "tx "s + ex.what());
}
}
@@ -2837,8 +2907,9 @@ PeerImp::checkPropose(
if (!cluster() && !peerPos.checkSign())
{
JLOG(p_journal_.warn()) << "Proposal fails sig check";
charge(Resource::feeInvalidSignature);
std::string desc{"Proposal fails sig check"};
JLOG(p_journal_.warn()) << desc;
charge(Resource::feeInvalidSignature, desc);
return;
}
@@ -2874,8 +2945,9 @@ PeerImp::checkValidation(
{
if (!val->isValid())
{
JLOG(p_journal_.debug()) << "Validation forwarded by peer is invalid";
charge(Resource::feeInvalidSignature);
std::string desc{"Validation forwarded by peer is invalid"};
JLOG(p_journal_.debug()) << desc;
charge(Resource::feeInvalidSignature, desc);
return;
}
@@ -2905,7 +2977,8 @@ PeerImp::checkValidation(
{
JLOG(p_journal_.trace())
<< "Exception processing validation: " << ex.what();
charge(Resource::feeInvalidRequest);
using namespace std::string_literals;
charge(Resource::feeMalformedRequest, "validation "s + ex.what());
}
}
@@ -3074,7 +3147,8 @@ PeerImp::getLedger(std::shared_ptr<protocol::TMGetLedger> const& m)
{
// Do not resource charge a peer responding to a relay
if (!m->has_requestcookie())
charge(Resource::feeInvalidRequest);
charge(
Resource::feeMalformedRequest, "get_ledger ledgerSeq");
ledger.reset();
JLOG(p_journal_.warn())
@@ -3136,7 +3210,8 @@ PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
{
// Do not resource charge a peer responding to a relay
if (!m->has_requestcookie())
charge(Resource::feeMediumBurdenPeer);
charge(
Resource::feeModerateBurdenPeer, "received a get ledger request");
std::shared_ptr<Ledger const> ledger;
std::shared_ptr<SHAMap const> sharedMap;
@@ -3246,6 +3321,9 @@ PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
for (auto const& d : data)
{
if (ledgerData.nodes_size() >=
Tuning::hardMaxReplyNodes)
break;
protocol::TMLedgerNode* node{ledgerData.add_nodes()};
node->set_nodeid(d.first.getRawString());
node->set_nodedata(d.second.data(), d.second.size());
@@ -3300,6 +3378,9 @@ PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
<< ledgerData.nodes_size() << " nodes";
}
if (ledgerData.nodes_size() == 0)
return;
send(std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA));
}

View File

@@ -145,10 +145,29 @@ private:
//
// June 2019
struct ChargeWithContext
{
Resource::Charge fee = Resource::feeTrivialPeer;
std::string context = {};
void
update(Resource::Charge f, std::string const& add)
{
XRPL_ASSERT(
f >= fee, "ripple::PeerImp::ChargeWithContext fee increases");
fee = f;
if (!context.empty())
{
context += " ";
}
context += add;
}
};
std::mutex mutable recentLock_;
protocol::TMStatusChange last_status_;
Resource::Consumer usage_;
Resource::Charge fee_;
ChargeWithContext fee_;
std::shared_ptr<PeerFinder::Slot> const slot_;
boost::beast::multi_buffer read_buffer_;
http_request_type request_;
@@ -304,7 +323,7 @@ public:
}
void
charge(Resource::Charge const& fee) override;
charge(Resource::Charge const& fee, std::string const& context) override;
//
// Identity
@@ -482,11 +501,15 @@ private:
the queue when called from onMessage(TMTransactions) because this
message is a response to the missing transactions request and the queue
would not have any of these transactions.
@param batch is false when called from onMessage(TMTransaction)
and is true when called from onMessage(TMTransactions). If true, then the
transaction is part of a batch, and should not be charged an extra fee.
*/
void
handleTransaction(
std::shared_ptr<protocol::TMTransaction> const& m,
bool eraseTxQueue);
bool eraseTxQueue,
bool batch);
/** Handle protocol message with hashes of transactions that have not
been relayed by an upstream node down to its peers - request
@@ -598,7 +621,8 @@ private:
checkTransaction(
int flags,
bool checkSignature,
std::shared_ptr<STTx const> const& stx);
std::shared_ptr<STTx const> const& stx,
bool batch);
void
checkPropose(
@@ -664,7 +688,7 @@ PeerImp::PeerImp(
, creationTime_(clock_type::now())
, squelch_(app_.journal("Squelch"))
, usage_(usage)
, fee_(Resource::feeLightPeer)
, fee_{Resource::feeTrivialPeer}
, slot_(std::move(slot))
, response_(std::move(response))
, headers_(response_)

View File

@@ -1035,7 +1035,7 @@ getLedgerByContext(RPC::JsonContext& context)
"Exactly one of ledger_hash and ledger_index can be set.");
}
context.loadType = Resource::feeHighBurdenRPC;
context.loadType = Resource::feeHeavyBurdenRPC;
if (hasHash)
{

View File

@@ -465,7 +465,7 @@ ServerHandler::processSession(
if (jv.isMember(jss::api_version))
jr[jss::api_version] = jv[jss::api_version];
is->getConsumer().charge(Resource::feeInvalidRPC);
is->getConsumer().charge(Resource::feeMalformedRPC);
return jr;
}
@@ -482,7 +482,7 @@ ServerHandler::processSession(
is->user());
if (Role::FORBID == role)
{
loadType = Resource::feeInvalidRPC;
loadType = Resource::feeMalformedRPC;
jr[jss::result] = rpcError(rpcFORBIDDEN);
}
else
@@ -750,7 +750,7 @@ ServerHandler::processRequest(
if (role == Role::FORBID)
{
usage.charge(Resource::feeInvalidRPC);
usage.charge(Resource::feeMalformedRPC);
if (!batch)
{
HTTPReply(403, "Forbidden", output, rpcJ);
@@ -764,7 +764,7 @@ ServerHandler::processRequest(
if (!jsonRPC.isMember(jss::method) || jsonRPC[jss::method].isNull())
{
usage.charge(Resource::feeInvalidRPC);
usage.charge(Resource::feeMalformedRPC);
if (!batch)
{
HTTPReply(400, "Null method", output, rpcJ);
@@ -779,7 +779,7 @@ ServerHandler::processRequest(
Json::Value const& method = jsonRPC[jss::method];
if (!method.isString())
{
usage.charge(Resource::feeInvalidRPC);
usage.charge(Resource::feeMalformedRPC);
if (!batch)
{
HTTPReply(400, "method is not string", output, rpcJ);
@@ -795,7 +795,7 @@ ServerHandler::processRequest(
std::string strMethod = method.asString();
if (strMethod.empty())
{
usage.charge(Resource::feeInvalidRPC);
usage.charge(Resource::feeMalformedRPC);
if (!batch)
{
HTTPReply(400, "method is empty", output, rpcJ);
@@ -823,7 +823,7 @@ ServerHandler::processRequest(
else if (!params.isArray() || params.size() != 1)
{
usage.charge(Resource::feeInvalidRPC);
usage.charge(Resource::feeMalformedRPC);
HTTPReply(400, "params unparseable", output, rpcJ);
return;
}
@@ -832,7 +832,7 @@ ServerHandler::processRequest(
params = std::move(params[0u]);
if (!params.isObjectOrNull())
{
usage.charge(Resource::feeInvalidRPC);
usage.charge(Resource::feeMalformedRPC);
HTTPReply(400, "params unparseable", output, rpcJ);
return;
}
@@ -848,7 +848,7 @@ ServerHandler::processRequest(
{
if (!params[jss::ripplerpc].isString())
{
usage.charge(Resource::feeInvalidRPC);
usage.charge(Resource::feeMalformedRPC);
if (!batch)
{
HTTPReply(400, "ripplerpc is not a string", output, rpcJ);

View File

@@ -74,7 +74,7 @@ doGatewayBalances(RPC::JsonContext& context)
if (!id)
return rpcError(rpcACT_MALFORMED);
auto const accountID{std::move(id.value())};
context.loadType = Resource::feeHighBurdenRPC;
context.loadType = Resource::feeHeavyBurdenRPC;
result[jss::account] = toBase58(accountID);

View File

@@ -80,7 +80,7 @@ LedgerHandler::check()
return rpcTOO_BUSY;
}
context_.loadType =
binary ? Resource::feeMediumBurdenRPC : Resource::feeHighBurdenRPC;
binary ? Resource::feeMediumBurdenRPC : Resource::feeHeavyBurdenRPC;
}
if (queue)
{

View File

@@ -52,7 +52,7 @@ doPathFind(RPC::JsonContext& context)
if (sSubCommand == "create")
{
context.loadType = Resource::feeHighBurdenRPC;
context.loadType = Resource::feeHeavyBurdenRPC;
context.infoSub->clearRequest();
return context.app.getPathRequests().makePathRequest(
context.infoSub, lpLedger, context.params);

View File

@@ -34,7 +34,7 @@ doRipplePathFind(RPC::JsonContext& context)
if (context.app.config().PATH_SEARCH_MAX == 0)
return rpcError(rpcNOT_SUPPORTED);
context.loadType = Resource::feeHighBurdenRPC;
context.loadType = Resource::feeHeavyBurdenRPC;
std::shared_ptr<ReadView const> lpLedger;
Json::Value jvResult;

View File

@@ -40,7 +40,7 @@ doSignFor(RPC::JsonContext& context)
rpcNOT_SUPPORTED, "Signing is not supported by this server.");
}
context.loadType = Resource::feeHighBurdenRPC;
context.loadType = Resource::feeHeavyBurdenRPC;
auto const failHard = context.params[jss::fail_hard].asBool();
auto const failType = NetworkOPs::doFailHard(failHard);

View File

@@ -38,7 +38,7 @@ doSign(RPC::JsonContext& context)
rpcNOT_SUPPORTED, "Signing is not supported by this server.");
}
context.loadType = Resource::feeHighBurdenRPC;
context.loadType = Resource::feeHeavyBurdenRPC;
NetworkOPs::FailHard const failType = NetworkOPs::doFailHard(
context.params.isMember(jss::fail_hard) &&
context.params[jss::fail_hard].asBool());

View File

@@ -33,7 +33,7 @@ namespace ripple {
Json::Value
doSubmitMultiSigned(RPC::JsonContext& context)
{
context.loadType = Resource::feeHighBurdenRPC;
context.loadType = Resource::feeHeavyBurdenRPC;
auto const failHard = context.params[jss::fail_hard].asBool();
auto const failType = NetworkOPs::doFailHard(failHard);