mirror of
https://github.com/XRPLF/rippled.git
synced 2026-06-06 18:26:51 +00:00
Instrument proposal, validation and transaction messages (#5348)
Adds metric counters for the following P2P message types: * Untrusted proposal and validation messages * Duplicate proposal, validation and transaction messages
This commit is contained in:
157
src/test/overlay/traffic_count_test.cpp
Normal file
157
src/test/overlay/traffic_count_test.cpp
Normal file
@@ -0,0 +1,157 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of rippled: https://github.com/ripple/rippled
|
||||
Copyright (c) 2025 Ripple Labs Inc.
|
||||
|
||||
Permission to use, copy, modify, and/or distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include <xrpld/overlay/Message.h>
|
||||
#include <xrpld/overlay/detail/TrafficCount.h>
|
||||
|
||||
#include <xrpl/beast/unit_test.h>
|
||||
#include <xrpl/protocol/messages.h>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
namespace test {
|
||||
|
||||
class traffic_count_test : public beast::unit_test::suite
|
||||
{
|
||||
public:
|
||||
traffic_count_test() = default;
|
||||
|
||||
void
|
||||
testCategorize()
|
||||
{
|
||||
testcase("categorize");
|
||||
protocol::TMPing message;
|
||||
message.set_type(protocol::TMPing::ptPING);
|
||||
|
||||
// a known message is categorized to a proper category
|
||||
auto const known =
|
||||
TrafficCount::categorize(message, protocol::mtPING, false);
|
||||
BEAST_EXPECT(known == TrafficCount::category::base);
|
||||
|
||||
// an unknown message type is categorized as unknown
|
||||
auto const unknown = TrafficCount::categorize(
|
||||
message, static_cast<protocol::MessageType>(99), false);
|
||||
BEAST_EXPECT(unknown == TrafficCount::category::unknown);
|
||||
}
|
||||
|
||||
struct TestCase
|
||||
{
|
||||
std::string name;
|
||||
int size;
|
||||
bool inbound;
|
||||
int messageCount;
|
||||
std::uint64_t expectedBytesIn;
|
||||
std::uint64_t expectedBytesOut;
|
||||
std::uint64_t expectedMessagesIn;
|
||||
std::uint64_t expectedMessagesOut;
|
||||
};
|
||||
|
||||
void
|
||||
testAddCount()
|
||||
{
|
||||
auto run = [&](TestCase const& tc) {
|
||||
testcase(tc.name);
|
||||
TrafficCount m_traffic;
|
||||
|
||||
auto const counts = m_traffic.getCounts();
|
||||
std::for_each(counts.begin(), counts.end(), [&](auto const& pair) {
|
||||
for (auto i = 0; i < tc.messageCount; ++i)
|
||||
m_traffic.addCount(pair.first, tc.inbound, tc.size);
|
||||
});
|
||||
|
||||
auto const counts_new = m_traffic.getCounts();
|
||||
std::for_each(
|
||||
counts_new.begin(), counts_new.end(), [&](auto const& pair) {
|
||||
BEAST_EXPECT(
|
||||
pair.second.bytesIn.load() == tc.expectedBytesIn);
|
||||
BEAST_EXPECT(
|
||||
pair.second.bytesOut.load() == tc.expectedBytesOut);
|
||||
BEAST_EXPECT(
|
||||
pair.second.messagesIn.load() == tc.expectedMessagesIn);
|
||||
BEAST_EXPECT(
|
||||
pair.second.messagesOut.load() ==
|
||||
tc.expectedMessagesOut);
|
||||
});
|
||||
};
|
||||
|
||||
auto const testcases = {
|
||||
TestCase{
|
||||
.name = "zero-counts",
|
||||
.size = 0,
|
||||
.inbound = false,
|
||||
.messageCount = 0,
|
||||
.expectedBytesIn = 0,
|
||||
.expectedBytesOut = 0,
|
||||
.expectedMessagesIn = 0,
|
||||
.expectedMessagesOut = 0,
|
||||
},
|
||||
TestCase{
|
||||
.name = "inbound-counts",
|
||||
.size = 10,
|
||||
.inbound = true,
|
||||
.messageCount = 10,
|
||||
.expectedBytesIn = 100,
|
||||
.expectedBytesOut = 0,
|
||||
.expectedMessagesIn = 10,
|
||||
.expectedMessagesOut = 0,
|
||||
},
|
||||
TestCase{
|
||||
.name = "outbound-counts",
|
||||
.size = 10,
|
||||
.inbound = false,
|
||||
.messageCount = 10,
|
||||
.expectedBytesIn = 0,
|
||||
.expectedBytesOut = 100,
|
||||
.expectedMessagesIn = 0,
|
||||
.expectedMessagesOut = 10,
|
||||
},
|
||||
};
|
||||
|
||||
for (auto const& tc : testcases)
|
||||
run(tc);
|
||||
}
|
||||
|
||||
void
|
||||
testToString()
|
||||
{
|
||||
testcase("category-to-string");
|
||||
|
||||
// known category returns known string value
|
||||
BEAST_EXPECT(
|
||||
TrafficCount::to_string(TrafficCount::category::total) == "total");
|
||||
|
||||
// return "unknown" for unknown categories
|
||||
BEAST_EXPECT(
|
||||
TrafficCount::to_string(
|
||||
static_cast<TrafficCount::category>(1000)) == "unknown");
|
||||
}
|
||||
|
||||
void
|
||||
run() override
|
||||
{
|
||||
testCategorize();
|
||||
testAddCount();
|
||||
testToString();
|
||||
}
|
||||
};
|
||||
|
||||
BEAST_DEFINE_TESTSUITE(traffic_count, overlay, ripple);
|
||||
|
||||
} // namespace test
|
||||
} // namespace ripple
|
||||
@@ -60,7 +60,7 @@ public:
|
||||
*/
|
||||
Message(
|
||||
::google::protobuf::Message const& message,
|
||||
int type,
|
||||
protocol::MessageType type,
|
||||
std::optional<PublicKey> const& validator = {});
|
||||
|
||||
/** Retrieve the size of the packed but uncompressed message data. */
|
||||
|
||||
@@ -26,7 +26,7 @@ namespace ripple {
|
||||
|
||||
Message::Message(
|
||||
::google::protobuf::Message const& message,
|
||||
int type,
|
||||
protocol::MessageType type,
|
||||
std::optional<PublicKey> const& validator)
|
||||
: category_(TrafficCount::categorize(message, type, false))
|
||||
, validatorKey_(validator)
|
||||
|
||||
@@ -41,6 +41,8 @@
|
||||
|
||||
#include <boost/algorithm/string/predicate.hpp>
|
||||
|
||||
#include "xrpld/overlay/detail/TrafficCount.h"
|
||||
|
||||
namespace ripple {
|
||||
|
||||
namespace CrawlOptions {
|
||||
@@ -145,13 +147,11 @@ OverlayImpl::OverlayImpl(
|
||||
std::bind(&OverlayImpl::collect_metrics, this),
|
||||
collector,
|
||||
[counts = m_traffic.getCounts(), collector]() {
|
||||
std::vector<TrafficGauges> ret;
|
||||
ret.reserve(counts.size());
|
||||
std::unordered_map<TrafficCount::category, TrafficGauges> ret;
|
||||
|
||||
for (size_t i = 0; i < counts.size(); ++i)
|
||||
{
|
||||
ret.push_back(TrafficGauges(counts[i].name, collector));
|
||||
}
|
||||
for (auto const& pair : counts)
|
||||
ret.emplace(
|
||||
pair.first, TrafficGauges(pair.second.name, collector));
|
||||
|
||||
return ret;
|
||||
}())
|
||||
@@ -580,17 +580,14 @@ OverlayImpl::onWrite(beast::PropertyStream::Map& stream)
|
||||
{
|
||||
beast::PropertyStream::Set set("traffic", stream);
|
||||
auto const stats = m_traffic.getCounts();
|
||||
for (auto const& i : stats)
|
||||
for (auto const& pair : stats)
|
||||
{
|
||||
if (i)
|
||||
{
|
||||
beast::PropertyStream::Map item(set);
|
||||
item["category"] = i.name;
|
||||
item["bytes_in"] = std::to_string(i.bytesIn.load());
|
||||
item["messages_in"] = std::to_string(i.messagesIn.load());
|
||||
item["bytes_out"] = std::to_string(i.bytesOut.load());
|
||||
item["messages_out"] = std::to_string(i.messagesOut.load());
|
||||
}
|
||||
beast::PropertyStream::Map item(set);
|
||||
item["category"] = pair.second.name;
|
||||
item["bytes_in"] = std::to_string(pair.second.bytesIn.load());
|
||||
item["messages_in"] = std::to_string(pair.second.messagesIn.load());
|
||||
item["bytes_out"] = std::to_string(pair.second.bytesOut.load());
|
||||
item["messages_out"] = std::to_string(pair.second.messagesOut.load());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -690,14 +687,16 @@ OverlayImpl::onManifests(
|
||||
}
|
||||
|
||||
void
|
||||
OverlayImpl::reportTraffic(
|
||||
TrafficCount::category cat,
|
||||
bool isInbound,
|
||||
int number)
|
||||
OverlayImpl::reportInboundTraffic(TrafficCount::category cat, int size)
|
||||
{
|
||||
m_traffic.addCount(cat, isInbound, number);
|
||||
m_traffic.addCount(cat, true, size);
|
||||
}
|
||||
|
||||
void
|
||||
OverlayImpl::reportOutboundTraffic(TrafficCount::category cat, int size)
|
||||
{
|
||||
m_traffic.addCount(cat, false, size);
|
||||
}
|
||||
/** The number of active peers on the network
|
||||
Active peers are only those peers that have completed the handshake
|
||||
and are running the Ripple protocol.
|
||||
|
||||
@@ -345,7 +345,10 @@ public:
|
||||
makePrefix(std::uint32_t id);
|
||||
|
||||
void
|
||||
reportTraffic(TrafficCount::category cat, bool isInbound, int bytes);
|
||||
reportInboundTraffic(TrafficCount::category cat, int bytes);
|
||||
|
||||
void
|
||||
reportOutboundTraffic(TrafficCount::category cat, int bytes);
|
||||
|
||||
void
|
||||
incJqTransOverflow() override
|
||||
@@ -561,14 +564,16 @@ private:
|
||||
struct TrafficGauges
|
||||
{
|
||||
TrafficGauges(
|
||||
char const* name,
|
||||
std::string const& name,
|
||||
beast::insight::Collector::ptr const& collector)
|
||||
: bytesIn(collector->make_gauge(name, "Bytes_In"))
|
||||
: name(name)
|
||||
, bytesIn(collector->make_gauge(name, "Bytes_In"))
|
||||
, bytesOut(collector->make_gauge(name, "Bytes_Out"))
|
||||
, messagesIn(collector->make_gauge(name, "Messages_In"))
|
||||
, messagesOut(collector->make_gauge(name, "Messages_Out"))
|
||||
{
|
||||
}
|
||||
std::string const name;
|
||||
beast::insight::Gauge bytesIn;
|
||||
beast::insight::Gauge bytesOut;
|
||||
beast::insight::Gauge messagesIn;
|
||||
@@ -581,7 +586,8 @@ private:
|
||||
Stats(
|
||||
Handler const& handler,
|
||||
beast::insight::Collector::ptr const& collector,
|
||||
std::vector<TrafficGauges>&& trafficGauges_)
|
||||
std::unordered_map<TrafficCount::category, TrafficGauges>&&
|
||||
trafficGauges_)
|
||||
: peerDisconnects(
|
||||
collector->make_gauge("Overlay", "Peer_Disconnects"))
|
||||
, trafficGauges(std::move(trafficGauges_))
|
||||
@@ -590,7 +596,7 @@ private:
|
||||
}
|
||||
|
||||
beast::insight::Gauge peerDisconnects;
|
||||
std::vector<TrafficGauges> trafficGauges;
|
||||
std::unordered_map<TrafficCount::category, TrafficGauges> trafficGauges;
|
||||
beast::insight::Hook hook;
|
||||
};
|
||||
|
||||
@@ -607,13 +613,25 @@ private:
|
||||
counts.size() == m_stats.trafficGauges.size(),
|
||||
"ripple::OverlayImpl::collect_metrics : counts size do match");
|
||||
|
||||
for (std::size_t i = 0; i < counts.size(); ++i)
|
||||
for (auto const& [key, value] : counts)
|
||||
{
|
||||
m_stats.trafficGauges[i].bytesIn = counts[i].bytesIn;
|
||||
m_stats.trafficGauges[i].bytesOut = counts[i].bytesOut;
|
||||
m_stats.trafficGauges[i].messagesIn = counts[i].messagesIn;
|
||||
m_stats.trafficGauges[i].messagesOut = counts[i].messagesOut;
|
||||
auto it = m_stats.trafficGauges.find(key);
|
||||
if (it == m_stats.trafficGauges.end())
|
||||
continue;
|
||||
|
||||
auto& gauge = it->second;
|
||||
|
||||
XRPL_ASSERT(
|
||||
gauge.name == value.name,
|
||||
"ripple::OverlayImpl::collect_metrics : gauge and counter "
|
||||
"match");
|
||||
|
||||
gauge.bytesIn = value.bytesIn;
|
||||
gauge.bytesOut = value.bytesOut;
|
||||
gauge.messagesIn = value.messagesIn;
|
||||
gauge.messagesOut = value.messagesOut;
|
||||
}
|
||||
|
||||
m_stats.peerDisconnects = getPeerDisconnect();
|
||||
}
|
||||
};
|
||||
|
||||
@@ -249,11 +249,21 @@ PeerImp::send(std::shared_ptr<Message> const& m)
|
||||
|
||||
auto validator = m->getValidatorKey();
|
||||
if (validator && !squelch_.expireSquelch(*validator))
|
||||
{
|
||||
overlay_.reportOutboundTraffic(
|
||||
TrafficCount::category::squelch_suppressed,
|
||||
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
|
||||
return;
|
||||
}
|
||||
|
||||
overlay_.reportTraffic(
|
||||
// report categorized outgoing traffic
|
||||
overlay_.reportOutboundTraffic(
|
||||
safe_cast<TrafficCount::category>(m->getCategory()),
|
||||
false,
|
||||
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
|
||||
|
||||
// report total outgoing traffic
|
||||
overlay_.reportOutboundTraffic(
|
||||
TrafficCount::category::total,
|
||||
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
|
||||
|
||||
auto sendq_size = send_queue_.size();
|
||||
@@ -1014,8 +1024,17 @@ PeerImp::onMessageBegin(
|
||||
auto const name = protocolMessageName(type);
|
||||
load_event_ = app_.getJobQueue().makeLoadEvent(jtPEER, name);
|
||||
fee_ = {Resource::feeTrivialPeer, name};
|
||||
auto const category = TrafficCount::categorize(*m, type, true);
|
||||
overlay_.reportTraffic(category, true, static_cast<int>(size));
|
||||
|
||||
auto const category = TrafficCount::categorize(
|
||||
*m, static_cast<protocol::MessageType>(type), true);
|
||||
|
||||
// report total incoming traffic
|
||||
overlay_.reportInboundTraffic(
|
||||
TrafficCount::category::total, static_cast<int>(size));
|
||||
|
||||
// increase the traffic received for a specific category
|
||||
overlay_.reportInboundTraffic(category, static_cast<int>(size));
|
||||
|
||||
using namespace protocol;
|
||||
if ((type == MessageType::mtTRANSACTION ||
|
||||
type == MessageType::mtHAVE_TRANSACTIONS ||
|
||||
@@ -1283,6 +1302,10 @@ PeerImp::handleTransaction(
|
||||
else if (eraseTxQueue && txReduceRelayEnabled())
|
||||
removeTxQueue(txID);
|
||||
|
||||
overlay_.reportInboundTraffic(
|
||||
TrafficCount::category::transaction_duplicate,
|
||||
Message::messageSize(*m));
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1670,8 +1693,16 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
|
||||
// If the operator has specified that untrusted proposals be dropped then
|
||||
// this happens here I.e. before further wasting CPU verifying the signature
|
||||
// of an untrusted key
|
||||
if (!isTrusted && app_.config().RELAY_UNTRUSTED_PROPOSALS == -1)
|
||||
return;
|
||||
if (!isTrusted)
|
||||
{
|
||||
// report untrusted proposal messages
|
||||
overlay_.reportInboundTraffic(
|
||||
TrafficCount::category::proposal_untrusted,
|
||||
Message::messageSize(*m));
|
||||
|
||||
if (app_.config().RELAY_UNTRUSTED_PROPOSALS == -1)
|
||||
return;
|
||||
}
|
||||
|
||||
uint256 const proposeHash{set.currenttxhash()};
|
||||
uint256 const prevLedger{set.previousledger()};
|
||||
@@ -1696,7 +1727,14 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
|
||||
(stopwatch().now() - *relayed) < reduce_relay::IDLED)
|
||||
overlay_.updateSlotAndSquelch(
|
||||
suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
|
||||
|
||||
// report duplicate proposal messages
|
||||
overlay_.reportInboundTraffic(
|
||||
TrafficCount::category::proposal_duplicate,
|
||||
Message::messageSize(*m));
|
||||
|
||||
JLOG(p_journal_.trace()) << "Proposal: duplicate";
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -2310,17 +2348,26 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
|
||||
auto const isTrusted =
|
||||
app_.validators().trusted(val->getSignerPublic());
|
||||
|
||||
// If the operator has specified that untrusted validations be dropped
|
||||
// then this happens here I.e. before further wasting CPU verifying the
|
||||
// signature of an untrusted key
|
||||
if (!isTrusted && app_.config().RELAY_UNTRUSTED_VALIDATIONS == -1)
|
||||
return;
|
||||
// If the operator has specified that untrusted validations be
|
||||
// dropped then this happens here I.e. before further wasting CPU
|
||||
// verifying the signature of an untrusted key
|
||||
if (!isTrusted)
|
||||
{
|
||||
// increase untrusted validations received
|
||||
overlay_.reportInboundTraffic(
|
||||
TrafficCount::category::validation_untrusted,
|
||||
Message::messageSize(*m));
|
||||
|
||||
if (app_.config().RELAY_UNTRUSTED_VALIDATIONS == -1)
|
||||
return;
|
||||
}
|
||||
|
||||
auto key = sha512Half(makeSlice(m->validation()));
|
||||
|
||||
if (auto [added, relayed] =
|
||||
app_.getHashRouter().addSuppressionPeerWithStatus(key, id_);
|
||||
!added)
|
||||
auto [added, relayed] =
|
||||
app_.getHashRouter().addSuppressionPeerWithStatus(key, id_);
|
||||
|
||||
if (!added)
|
||||
{
|
||||
// Count unique messages (Slots has it's own 'HashRouter'), which a
|
||||
// peer receives within IDLED seconds since the message has been
|
||||
@@ -2330,6 +2377,12 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
|
||||
(stopwatch().now() - *relayed) < reduce_relay::IDLED)
|
||||
overlay_.updateSlotAndSquelch(
|
||||
key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
|
||||
|
||||
// increase duplicate validations received
|
||||
overlay_.reportInboundTraffic(
|
||||
TrafficCount::category::validation_duplicate,
|
||||
Message::messageSize(*m));
|
||||
|
||||
JLOG(p_journal_.trace()) << "Validation: duplicate";
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -21,36 +21,41 @@
|
||||
|
||||
namespace ripple {
|
||||
|
||||
std::unordered_map<protocol::MessageType, TrafficCount::category> const
|
||||
type_lookup = {
|
||||
{protocol::mtPING, TrafficCount::category::base},
|
||||
{protocol::mtSTATUS_CHANGE, TrafficCount::category::base},
|
||||
{protocol::mtMANIFESTS, TrafficCount::category::manifests},
|
||||
{protocol::mtENDPOINTS, TrafficCount::category::overlay},
|
||||
{protocol::mtTRANSACTION, TrafficCount::category::transaction},
|
||||
{protocol::mtVALIDATORLIST, TrafficCount::category::validatorlist},
|
||||
{protocol::mtVALIDATORLISTCOLLECTION,
|
||||
TrafficCount::category::validatorlist},
|
||||
{protocol::mtVALIDATION, TrafficCount::category::validation},
|
||||
{protocol::mtPROPOSE_LEDGER, TrafficCount::category::proposal},
|
||||
{protocol::mtPROOF_PATH_REQ,
|
||||
TrafficCount::category::proof_path_request},
|
||||
{protocol::mtPROOF_PATH_RESPONSE,
|
||||
TrafficCount::category::proof_path_response},
|
||||
{protocol::mtREPLAY_DELTA_REQ,
|
||||
TrafficCount::category::replay_delta_request},
|
||||
{protocol::mtREPLAY_DELTA_RESPONSE,
|
||||
TrafficCount::category::replay_delta_response},
|
||||
{protocol::mtHAVE_TRANSACTIONS,
|
||||
TrafficCount::category::have_transactions},
|
||||
{protocol::mtTRANSACTIONS,
|
||||
TrafficCount::category::requested_transactions},
|
||||
{protocol::mtSQUELCH, TrafficCount::category::squelch},
|
||||
};
|
||||
|
||||
TrafficCount::category
|
||||
TrafficCount::categorize(
|
||||
::google::protobuf::Message const& message,
|
||||
int type,
|
||||
protocol::MessageType type,
|
||||
bool inbound)
|
||||
{
|
||||
if ((type == protocol::mtPING) || (type == protocol::mtSTATUS_CHANGE))
|
||||
return TrafficCount::category::base;
|
||||
|
||||
if (type == protocol::mtCLUSTER)
|
||||
return TrafficCount::category::cluster;
|
||||
|
||||
if (type == protocol::mtMANIFESTS)
|
||||
return TrafficCount::category::manifests;
|
||||
|
||||
if (type == protocol::mtENDPOINTS)
|
||||
return TrafficCount::category::overlay;
|
||||
|
||||
if (type == protocol::mtTRANSACTION)
|
||||
return TrafficCount::category::transaction;
|
||||
|
||||
if (type == protocol::mtVALIDATORLIST ||
|
||||
type == protocol::mtVALIDATORLISTCOLLECTION)
|
||||
return TrafficCount::category::validatorlist;
|
||||
|
||||
if (type == protocol::mtVALIDATION)
|
||||
return TrafficCount::category::validation;
|
||||
|
||||
if (type == protocol::mtPROPOSE_LEDGER)
|
||||
return TrafficCount::category::proposal;
|
||||
if (auto item = type_lookup.find(type); item != type_lookup.end())
|
||||
return item->second;
|
||||
|
||||
if (type == protocol::mtHAVE_SET)
|
||||
return inbound ? TrafficCount::category::get_set
|
||||
@@ -139,25 +144,6 @@ TrafficCount::categorize(
|
||||
: TrafficCount::category::get_hash;
|
||||
}
|
||||
|
||||
if (type == protocol::mtPROOF_PATH_REQ)
|
||||
return TrafficCount::category::proof_path_request;
|
||||
|
||||
if (type == protocol::mtPROOF_PATH_RESPONSE)
|
||||
return TrafficCount::category::proof_path_response;
|
||||
|
||||
if (type == protocol::mtREPLAY_DELTA_REQ)
|
||||
return TrafficCount::category::replay_delta_request;
|
||||
|
||||
if (type == protocol::mtREPLAY_DELTA_RESPONSE)
|
||||
return TrafficCount::category::replay_delta_response;
|
||||
|
||||
if (type == protocol::mtHAVE_TRANSACTIONS)
|
||||
return TrafficCount::category::have_transactions;
|
||||
|
||||
if (type == protocol::mtTRANSACTIONS)
|
||||
return TrafficCount::category::requested_transactions;
|
||||
|
||||
return TrafficCount::category::unknown;
|
||||
}
|
||||
|
||||
} // namespace ripple
|
||||
|
||||
@@ -23,26 +23,46 @@
|
||||
#include <xrpl/beast/utility/instrumentation.h>
|
||||
#include <xrpl/protocol/messages.h>
|
||||
|
||||
#include <array>
|
||||
#include <atomic>
|
||||
#include <cstdint>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
/**
|
||||
TrafficCount is used to count ingress and egress wire bytes and number of
|
||||
messages. The general intended usage is as follows:
|
||||
1. Determine the message category by callin TrafficCount::categorize
|
||||
2. Increment the counters for incoming or outgoing traffic by calling
|
||||
TrafficCount::addCount
|
||||
3. Optionally, TrafficCount::addCount can be called at any time to
|
||||
increment additional traffic categories, not captured by
|
||||
TrafficCount::categorize.
|
||||
|
||||
There are two special categories:
|
||||
1. category::total - this category is used to report the total traffic
|
||||
amount. It should be incremented once just after receiving a new message, and
|
||||
once just before sending a message to a peer. Messages whose category is not
|
||||
in TrafficCount::categorize are not included in the total.
|
||||
2. category::unknown - this category is used to report traffic for
|
||||
messages of unknown type.
|
||||
*/
|
||||
class TrafficCount
|
||||
{
|
||||
public:
|
||||
enum category : std::size_t;
|
||||
|
||||
class TrafficStats
|
||||
{
|
||||
public:
|
||||
char const* name;
|
||||
std::string name;
|
||||
|
||||
std::atomic<std::uint64_t> bytesIn{0};
|
||||
std::atomic<std::uint64_t> bytesOut{0};
|
||||
std::atomic<std::uint64_t> messagesIn{0};
|
||||
std::atomic<std::uint64_t> messagesOut{0};
|
||||
|
||||
TrafficStats(char const* n) : name(n)
|
||||
TrafficStats(TrafficCount::category cat)
|
||||
: name(TrafficCount::to_string(cat))
|
||||
{
|
||||
}
|
||||
|
||||
@@ -70,11 +90,26 @@ public:
|
||||
cluster, // cluster overhead
|
||||
overlay, // overlay management
|
||||
manifests, // manifest management
|
||||
transaction,
|
||||
proposal,
|
||||
validation,
|
||||
|
||||
transaction, // transaction messages
|
||||
// The following categories breakdown transaction message type
|
||||
transaction_duplicate, // duplicate transaction messages
|
||||
|
||||
proposal, // proposal messages
|
||||
// The following categories breakdown proposal message type
|
||||
proposal_untrusted, // proposals from untrusted validators
|
||||
proposal_duplicate, // proposals seen previously
|
||||
|
||||
validation, // validation messages
|
||||
// The following categories breakdown validation message type
|
||||
validation_untrusted, // validations from untrusted validators
|
||||
validation_duplicate, // validations seen previously
|
||||
|
||||
validatorlist,
|
||||
|
||||
squelch,
|
||||
squelch_suppressed, // egress traffic amount suppressed by squelching
|
||||
|
||||
// TMHaveSet message:
|
||||
get_set, // transaction sets we try to get
|
||||
share_set, // transaction sets we get
|
||||
@@ -156,15 +191,20 @@ public:
|
||||
// TMTransactions
|
||||
requested_transactions,
|
||||
|
||||
// The total p2p bytes sent and received on the wire
|
||||
total,
|
||||
|
||||
unknown // must be last
|
||||
};
|
||||
|
||||
TrafficCount() = default;
|
||||
|
||||
/** Given a protocol message, determine which traffic category it belongs to
|
||||
*/
|
||||
static category
|
||||
categorize(
|
||||
::google::protobuf::Message const& message,
|
||||
int type,
|
||||
protocol::MessageType type,
|
||||
bool inbound);
|
||||
|
||||
/** Account for traffic associated with the given category */
|
||||
@@ -175,20 +215,24 @@ public:
|
||||
cat <= category::unknown,
|
||||
"ripple::TrafficCount::addCount : valid category input");
|
||||
|
||||
auto it = counts_.find(cat);
|
||||
|
||||
// nothing to do, the category does not exist
|
||||
if (it == counts_.end())
|
||||
return;
|
||||
|
||||
if (inbound)
|
||||
{
|
||||
counts_[cat].bytesIn += bytes;
|
||||
++counts_[cat].messagesIn;
|
||||
it->second.bytesIn += bytes;
|
||||
++it->second.messagesIn;
|
||||
}
|
||||
else
|
||||
{
|
||||
counts_[cat].bytesOut += bytes;
|
||||
++counts_[cat].messagesOut;
|
||||
it->second.bytesOut += bytes;
|
||||
++it->second.messagesOut;
|
||||
}
|
||||
}
|
||||
|
||||
TrafficCount() = default;
|
||||
|
||||
/** An up-to-date copy of all the counters
|
||||
|
||||
@return an object which satisfies the requirements of Container
|
||||
@@ -199,57 +243,131 @@ public:
|
||||
return counts_;
|
||||
}
|
||||
|
||||
static std::string
|
||||
to_string(category cat)
|
||||
{
|
||||
static const std::unordered_map<category, std::string> category_map = {
|
||||
{base, "overhead"},
|
||||
{cluster, "overhead_cluster"},
|
||||
{overlay, "overhead_overlay"},
|
||||
{manifests, "overhead_manifest"},
|
||||
{transaction, "transactions"},
|
||||
{transaction_duplicate, "transactions_duplicate"},
|
||||
{proposal, "proposals"},
|
||||
{proposal_untrusted, "proposals_untrusted"},
|
||||
{proposal_duplicate, "proposals_duplicate"},
|
||||
{validation, "validations"},
|
||||
{validation_untrusted, "validations_untrusted"},
|
||||
{validation_duplicate, "validations_duplicate"},
|
||||
{validatorlist, "validator_lists"},
|
||||
{squelch, "squelch"},
|
||||
{squelch_suppressed, "squelch_suppressed"},
|
||||
{get_set, "set_get"},
|
||||
{share_set, "set_share"},
|
||||
{ld_tsc_get, "ledger_data_Transaction_Set_candidate_get"},
|
||||
{ld_tsc_share, "ledger_data_Transaction_Set_candidate_share"},
|
||||
{ld_txn_get, "ledger_data_Transaction_Node_get"},
|
||||
{ld_txn_share, "ledger_data_Transaction_Node_share"},
|
||||
{ld_asn_get, "ledger_data_Account_State_Node_get"},
|
||||
{ld_asn_share, "ledger_data_Account_State_Node_share"},
|
||||
{ld_get, "ledger_data_get"},
|
||||
{ld_share, "ledger_data_share"},
|
||||
{gl_tsc_share, "ledger_Transaction_Set_candidate_share"},
|
||||
{gl_tsc_get, "ledger_Transaction_Set_candidate_get"},
|
||||
{gl_txn_share, "ledger_Transaction_node_share"},
|
||||
{gl_txn_get, "ledger_Transaction_node_get"},
|
||||
{gl_asn_share, "ledger_Account_State_node_share"},
|
||||
{gl_asn_get, "ledger_Account_State_node_get"},
|
||||
{gl_share, "ledger_share"},
|
||||
{gl_get, "ledger_get"},
|
||||
{share_hash_ledger, "getobject_Ledger_share"},
|
||||
{get_hash_ledger, "getobject_Ledger_get"},
|
||||
{share_hash_tx, "getobject_Transaction_share"},
|
||||
{get_hash_tx, "getobject_Transaction_get"},
|
||||
{share_hash_txnode, "getobject_Transaction_node_share"},
|
||||
{get_hash_txnode, "getobject_Transaction_node_get"},
|
||||
{share_hash_asnode, "getobject_Account_State_node_share"},
|
||||
{get_hash_asnode, "getobject_Account_State_node_get"},
|
||||
{share_cas_object, "getobject_CAS_share"},
|
||||
{get_cas_object, "getobject_CAS_get"},
|
||||
{share_fetch_pack, "getobject_Fetch_Pack_share"},
|
||||
{get_fetch_pack, "getobject_Fetch Pack_get"},
|
||||
{get_transactions, "getobject_Transactions_get"},
|
||||
{share_hash, "getobject_share"},
|
||||
{get_hash, "getobject_get"},
|
||||
{proof_path_request, "proof_path_request"},
|
||||
{proof_path_response, "proof_path_response"},
|
||||
{replay_delta_request, "replay_delta_request"},
|
||||
{replay_delta_response, "replay_delta_response"},
|
||||
{have_transactions, "have_transactions"},
|
||||
{requested_transactions, "requested_transactions"},
|
||||
{total, "total"}};
|
||||
|
||||
if (auto it = category_map.find(cat); it != category_map.end())
|
||||
return it->second;
|
||||
|
||||
return "unknown";
|
||||
}
|
||||
|
||||
protected:
|
||||
std::array<TrafficStats, category::unknown + 1> counts_{{
|
||||
{"overhead"}, // category::base
|
||||
{"overhead_cluster"}, // category::cluster
|
||||
{"overhead_overlay"}, // category::overlay
|
||||
{"overhead_manifest"}, // category::manifests
|
||||
{"transactions"}, // category::transaction
|
||||
{"proposals"}, // category::proposal
|
||||
{"validations"}, // category::validation
|
||||
{"validator_lists"}, // category::validatorlist
|
||||
{"set_get"}, // category::get_set
|
||||
{"set_share"}, // category::share_set
|
||||
{"ledger_data_Transaction_Set_candidate_get"}, // category::ld_tsc_get
|
||||
{"ledger_data_Transaction_Set_candidate_share"}, // category::ld_tsc_share
|
||||
{"ledger_data_Transaction_Node_get"}, // category::ld_txn_get
|
||||
{"ledger_data_Transaction_Node_share"}, // category::ld_txn_share
|
||||
{"ledger_data_Account_State_Node_get"}, // category::ld_asn_get
|
||||
{"ledger_data_Account_State_Node_share"}, // category::ld_asn_share
|
||||
{"ledger_data_get"}, // category::ld_get
|
||||
{"ledger_data_share"}, // category::ld_share
|
||||
{"ledger_Transaction_Set_candidate_share"}, // category::gl_tsc_share
|
||||
{"ledger_Transaction_Set_candidate_get"}, // category::gl_tsc_get
|
||||
{"ledger_Transaction_node_share"}, // category::gl_txn_share
|
||||
{"ledger_Transaction_node_get"}, // category::gl_txn_get
|
||||
{"ledger_Account_State_node_share"}, // category::gl_asn_share
|
||||
{"ledger_Account_State_node_get"}, // category::gl_asn_get
|
||||
{"ledger_share"}, // category::gl_share
|
||||
{"ledger_get"}, // category::gl_get
|
||||
{"getobject_Ledger_share"}, // category::share_hash_ledger
|
||||
{"getobject_Ledger_get"}, // category::get_hash_ledger
|
||||
{"getobject_Transaction_share"}, // category::share_hash_tx
|
||||
{"getobject_Transaction_get"}, // category::get_hash_tx
|
||||
{"getobject_Transaction_node_share"}, // category::share_hash_txnode
|
||||
{"getobject_Transaction_node_get"}, // category::get_hash_txnode
|
||||
{"getobject_Account_State_node_share"}, // category::share_hash_asnode
|
||||
{"getobject_Account_State_node_get"}, // category::get_hash_asnode
|
||||
{"getobject_CAS_share"}, // category::share_cas_object
|
||||
{"getobject_CAS_get"}, // category::get_cas_object
|
||||
{"getobject_Fetch_Pack_share"}, // category::share_fetch_pack
|
||||
{"getobject_Fetch Pack_get"}, // category::get_fetch_pack
|
||||
{"getobject_Transactions_get"}, // category::get_transactions
|
||||
{"getobject_share"}, // category::share_hash
|
||||
{"getobject_get"}, // category::get_hash
|
||||
{"proof_path_request"}, // category::proof_path_request
|
||||
{"proof_path_response"}, // category::proof_path_response
|
||||
{"replay_delta_request"}, // category::replay_delta_request
|
||||
{"replay_delta_response"}, // category::replay_delta_response
|
||||
{"have_transactions"}, // category::have_transactions
|
||||
{"requested_transactions"}, // category::transactions
|
||||
{"unknown"} // category::unknown
|
||||
}};
|
||||
std::unordered_map<category, TrafficStats> counts_{
|
||||
{base, {base}},
|
||||
{cluster, {cluster}},
|
||||
{overlay, {overlay}},
|
||||
{manifests, {manifests}},
|
||||
{transaction, {transaction}},
|
||||
{transaction_duplicate, {transaction_duplicate}},
|
||||
{proposal, {proposal}},
|
||||
{proposal_untrusted, {proposal_untrusted}},
|
||||
{proposal_duplicate, {proposal_duplicate}},
|
||||
{validation, {validation}},
|
||||
{validation_untrusted, {validation_untrusted}},
|
||||
{validation_duplicate, {validation_duplicate}},
|
||||
{validatorlist, {validatorlist}},
|
||||
{squelch, {squelch}},
|
||||
{squelch_suppressed, {squelch_suppressed}},
|
||||
{get_set, {get_set}},
|
||||
{share_set, {share_set}},
|
||||
{ld_tsc_get, {ld_tsc_get}},
|
||||
{ld_tsc_share, {ld_tsc_share}},
|
||||
{ld_txn_get, {ld_txn_get}},
|
||||
{ld_txn_share, {ld_txn_share}},
|
||||
{ld_asn_get, {ld_asn_get}},
|
||||
{ld_asn_share, {ld_asn_share}},
|
||||
{ld_get, {ld_get}},
|
||||
{ld_share, {ld_share}},
|
||||
{gl_tsc_share, {gl_tsc_share}},
|
||||
{gl_tsc_get, {gl_tsc_get}},
|
||||
{gl_txn_share, {gl_txn_share}},
|
||||
{gl_txn_get, {gl_txn_get}},
|
||||
{gl_asn_share, {gl_asn_share}},
|
||||
{gl_asn_get, {gl_asn_get}},
|
||||
{gl_share, {gl_share}},
|
||||
{gl_get, {gl_get}},
|
||||
{share_hash_ledger, {share_hash_ledger}},
|
||||
{get_hash_ledger, {get_hash_ledger}},
|
||||
{share_hash_tx, {share_hash_tx}},
|
||||
{get_hash_tx, {get_hash_tx}},
|
||||
{share_hash_txnode, {share_hash_txnode}},
|
||||
{get_hash_txnode, {get_hash_txnode}},
|
||||
{share_hash_asnode, {share_hash_asnode}},
|
||||
{get_hash_asnode, {get_hash_asnode}},
|
||||
{share_cas_object, {share_cas_object}},
|
||||
{get_cas_object, {get_cas_object}},
|
||||
{share_fetch_pack, {share_fetch_pack}},
|
||||
{get_fetch_pack, {get_fetch_pack}},
|
||||
{get_transactions, {get_transactions}},
|
||||
{share_hash, {share_hash}},
|
||||
{get_hash, {get_hash}},
|
||||
{proof_path_request, {proof_path_request}},
|
||||
{proof_path_response, {proof_path_response}},
|
||||
{replay_delta_request, {replay_delta_request}},
|
||||
{replay_delta_response, {replay_delta_response}},
|
||||
{have_transactions, {have_transactions}},
|
||||
{requested_transactions, {requested_transactions}},
|
||||
{total, {total}},
|
||||
{unknown, {unknown}},
|
||||
};
|
||||
};
|
||||
|
||||
} // namespace ripple
|
||||
|
||||
Reference in New Issue
Block a user