mirror of
https://github.com/XRPLF/rippled.git
synced 2026-04-29 15:37:57 +00:00
Move Overlay interface to libxrpl. We can revisit this part to decide whether we should move the Overlay implementation to libxrpl
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
This commit is contained in:
102
include/xrpl/overlay/Compression.h
Normal file
102
include/xrpl/overlay/Compression.h
Normal file
@@ -0,0 +1,102 @@
|
||||
#ifndef XRPL_COMPRESSION_H_INCLUDED
|
||||
#define XRPL_COMPRESSION_H_INCLUDED
|
||||
|
||||
#include <xrpl/basics/CompressionAlgorithms.h>
|
||||
#include <xrpl/basics/Log.h>
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
namespace compression {
|
||||
|
||||
std::size_t constexpr headerBytes = 6;
|
||||
std::size_t constexpr headerBytesCompressed = 10;
|
||||
|
||||
// All values other than 'none' must have the high bit. The low order four bits
|
||||
// must be 0.
|
||||
enum class Algorithm : std::uint8_t { None = 0x00, LZ4 = 0x90 };
|
||||
|
||||
enum class Compressed : std::uint8_t { On, Off };
|
||||
|
||||
/** Decompress input stream.
|
||||
* @tparam InputStream ZeroCopyInputStream
|
||||
* @param in Input source stream
|
||||
* @param inSize Size of compressed data
|
||||
* @param decompressed Buffer to hold decompressed message
|
||||
* @param algorithm Compression algorithm type
|
||||
* @return Size of decompressed data or zero if failed to decompress
|
||||
*/
|
||||
template <typename InputStream>
|
||||
std::size_t
|
||||
decompress(
|
||||
InputStream& in,
|
||||
std::size_t inSize,
|
||||
std::uint8_t* decompressed,
|
||||
std::size_t decompressedSize,
|
||||
Algorithm algorithm = Algorithm::LZ4)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (algorithm == Algorithm::LZ4)
|
||||
return xrpl::compression_algorithms::lz4Decompress(
|
||||
in, inSize, decompressed, decompressedSize);
|
||||
else
|
||||
{
|
||||
// LCOV_EXCL_START
|
||||
JLOG(debugLog().warn())
|
||||
<< "decompress: invalid compression algorithm "
|
||||
<< static_cast<int>(algorithm);
|
||||
UNREACHABLE(
|
||||
"xrpl::compression::decompress : invalid compression "
|
||||
"algorithm");
|
||||
// LCOV_EXCL_STOP
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/** Compress input data.
|
||||
* @tparam BufferFactory Callable object or lambda.
|
||||
* Takes the requested buffer size and returns allocated buffer pointer.
|
||||
* @param in Data to compress
|
||||
* @param inSize Size of the data
|
||||
* @param bf Compressed buffer allocator
|
||||
* @param algorithm Compression algorithm type
|
||||
* @return Size of compressed data, or zero if failed to compress
|
||||
*/
|
||||
template <class BufferFactory>
|
||||
std::size_t
|
||||
compress(
|
||||
void const* in,
|
||||
std::size_t inSize,
|
||||
BufferFactory&& bf,
|
||||
Algorithm algorithm = Algorithm::LZ4)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (algorithm == Algorithm::LZ4)
|
||||
return xrpl::compression_algorithms::lz4Compress(
|
||||
in, inSize, std::forward<BufferFactory>(bf));
|
||||
else
|
||||
{
|
||||
// LCOV_EXCL_START
|
||||
JLOG(debugLog().warn()) << "compress: invalid compression algorithm"
|
||||
<< static_cast<int>(algorithm);
|
||||
UNREACHABLE(
|
||||
"xrpl::compression::compress : invalid compression "
|
||||
"algorithm");
|
||||
// LCOV_EXCL_STOP
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
} // namespace compression
|
||||
|
||||
} // namespace xrpl
|
||||
|
||||
#endif // XRPL_COMPRESSION_H_INCLUDED
|
||||
121
include/xrpl/overlay/Message.h
Normal file
121
include/xrpl/overlay/Message.h
Normal file
@@ -0,0 +1,121 @@
|
||||
#ifndef XRPL_OVERLAY_MESSAGE_H_INCLUDED
|
||||
#define XRPL_OVERLAY_MESSAGE_H_INCLUDED
|
||||
|
||||
#include <xrpl/basics/ByteUtilities.h>
|
||||
#include <xrpl/overlay/Compression.h>
|
||||
#include <xrpl/protocol/PublicKey.h>
|
||||
#include <xrpl/protocol/messages.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <cstdint>
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
constexpr std::size_t maximiumMessageSize = megabytes(64);
|
||||
|
||||
// VFALCO NOTE If we forward declare Message and write out shared_ptr
|
||||
// instead of using the in-class type alias, we can remove the
|
||||
// entire ripple.pb.h from the main headers.
|
||||
//
|
||||
|
||||
// packaging of messages into length/type-prepended buffers
|
||||
// ready for transmission.
|
||||
//
|
||||
// Message implements simple "packing" of protocol buffers Messages into
|
||||
// a string prepended by a header specifying the message length.
|
||||
// MessageType should be a Message class generated by the protobuf compiler.
|
||||
//
|
||||
|
||||
class Message : public std::enable_shared_from_this<Message>
|
||||
{
|
||||
using Compressed = compression::Compressed;
|
||||
using Algorithm = compression::Algorithm;
|
||||
|
||||
public:
|
||||
/** Constructor
|
||||
* @param message Protocol message to serialize
|
||||
* @param type Protocol message type
|
||||
* @param validator Public Key of the source validator for Validation or
|
||||
* Proposal message. Used to check if the message should be squelched.
|
||||
*/
|
||||
Message(
|
||||
::google::protobuf::Message const& message,
|
||||
protocol::MessageType type,
|
||||
std::optional<PublicKey> const& validator = {});
|
||||
|
||||
/** Retrieve the size of the packed but uncompressed message data. */
|
||||
std::size_t
|
||||
getBufferSize();
|
||||
|
||||
static std::size_t
|
||||
messageSize(::google::protobuf::Message const& message);
|
||||
|
||||
static std::size_t
|
||||
totalSize(::google::protobuf::Message const& message);
|
||||
|
||||
/** Retrieve the packed message data. If compressed message is requested but
|
||||
* the message is not compressible then the uncompressed buffer is returned.
|
||||
* @param compressed Request compressed (Compress::On) or
|
||||
* uncompressed (Compress::Off) payload buffer
|
||||
* @return Payload buffer
|
||||
*/
|
||||
std::vector<uint8_t> const&
|
||||
getBuffer(Compressed tryCompressed);
|
||||
|
||||
/** Get the traffic category */
|
||||
std::size_t
|
||||
getCategory() const
|
||||
{
|
||||
return category_;
|
||||
}
|
||||
|
||||
/** Get the validator's key */
|
||||
std::optional<PublicKey> const&
|
||||
getValidatorKey() const
|
||||
{
|
||||
return validatorKey_;
|
||||
}
|
||||
|
||||
private:
|
||||
std::vector<uint8_t> buffer_;
|
||||
std::vector<uint8_t> bufferCompressed_;
|
||||
std::size_t category_;
|
||||
std::once_flag once_flag_;
|
||||
std::optional<PublicKey> validatorKey_;
|
||||
|
||||
/** Set the payload header
|
||||
* @param in Pointer to the payload
|
||||
* @param payloadBytes Size of the payload excluding the header size
|
||||
* @param type Protocol message type
|
||||
* @param compression Compression algorithm used in compression,
|
||||
* currently LZ4 only. If None then the message is uncompressed.
|
||||
* @param uncompressedBytes Size of the uncompressed message
|
||||
*/
|
||||
void
|
||||
setHeader(
|
||||
std::uint8_t* in,
|
||||
std::uint32_t payloadBytes,
|
||||
int type,
|
||||
Algorithm compression,
|
||||
std::uint32_t uncompressedBytes);
|
||||
|
||||
/** Try to compress the payload.
|
||||
* Can be called concurrently by multiple peers but is compressed once.
|
||||
* If the message is not compressible then the serialized buffer_ is used.
|
||||
*/
|
||||
void
|
||||
compress();
|
||||
|
||||
/** Get the message type from the payload header.
|
||||
* First four bytes are the compression/algorithm flag and the payload size.
|
||||
* Next two bytes are the message type
|
||||
* @param in Payload header pointer
|
||||
* @return Message type
|
||||
*/
|
||||
int
|
||||
getType(std::uint8_t const* in) const;
|
||||
};
|
||||
|
||||
} // namespace xrpl
|
||||
|
||||
#endif
|
||||
219
include/xrpl/overlay/Overlay.h
Normal file
219
include/xrpl/overlay/Overlay.h
Normal file
@@ -0,0 +1,219 @@
|
||||
#ifndef XRPL_OVERLAY_OVERLAY_H_INCLUDED
|
||||
#define XRPL_OVERLAY_OVERLAY_H_INCLUDED
|
||||
|
||||
#include <xrpl/beast/utility/PropertyStream.h>
|
||||
#include <xrpl/json/json_value.h>
|
||||
#include <xrpl/overlay/Peer.h>
|
||||
#include <xrpl/server/Handoff.h>
|
||||
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
#include <boost/asio/ssl/context.hpp>
|
||||
#include <boost/beast/core/tcp_stream.hpp>
|
||||
#include <boost/beast/ssl/ssl_stream.hpp>
|
||||
|
||||
#include <functional>
|
||||
#include <optional>
|
||||
|
||||
namespace boost {
|
||||
namespace asio {
|
||||
namespace ssl {
|
||||
class context;
|
||||
}
|
||||
} // namespace asio
|
||||
} // namespace boost
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
/** Manages the set of connected peers. */
|
||||
class Overlay : public beast::PropertyStream::Source
|
||||
{
|
||||
protected:
|
||||
using socket_type = boost::beast::tcp_stream;
|
||||
using stream_type = boost::beast::ssl_stream<socket_type>;
|
||||
|
||||
// VFALCO NOTE The requirement of this constructor is an
|
||||
// unfortunate problem with the API for
|
||||
// PropertyStream
|
||||
Overlay() : beast::PropertyStream::Source("peers")
|
||||
{
|
||||
}
|
||||
|
||||
public:
|
||||
enum class Promote { automatic, never, always };
|
||||
|
||||
struct Setup
|
||||
{
|
||||
explicit Setup() = default;
|
||||
|
||||
std::shared_ptr<boost::asio::ssl::context> context;
|
||||
beast::IP::Address public_ip;
|
||||
int ipLimit = 0;
|
||||
std::uint32_t crawlOptions = 0;
|
||||
std::optional<std::uint32_t> networkID;
|
||||
bool vlEnabled = true;
|
||||
};
|
||||
|
||||
using PeerSequence = std::vector<std::shared_ptr<Peer>>;
|
||||
|
||||
virtual ~Overlay() = default;
|
||||
|
||||
virtual void
|
||||
start()
|
||||
{
|
||||
}
|
||||
|
||||
virtual void
|
||||
stop()
|
||||
{
|
||||
}
|
||||
|
||||
/** Conditionally accept an incoming HTTP request. */
|
||||
virtual Handoff
|
||||
onHandoff(
|
||||
std::unique_ptr<stream_type>&& bundle,
|
||||
http_request_type&& request,
|
||||
boost::asio::ip::tcp::endpoint remote_address) = 0;
|
||||
|
||||
/** Establish a peer connection to the specified endpoint.
|
||||
The call returns immediately, the connection attempt is
|
||||
performed asynchronously.
|
||||
*/
|
||||
virtual void
|
||||
connect(beast::IP::Endpoint const& address) = 0;
|
||||
|
||||
/** Returns the maximum number of peers we are configured to allow. */
|
||||
virtual int
|
||||
limit() = 0;
|
||||
|
||||
/** Returns the number of active peers.
|
||||
Active peers are only those peers that have completed the
|
||||
handshake and are using the peer protocol.
|
||||
*/
|
||||
virtual std::size_t
|
||||
size() const = 0;
|
||||
|
||||
/** Return diagnostics on the status of all peers.
|
||||
@deprecated This is superceded by PropertyStream
|
||||
*/
|
||||
virtual Json::Value
|
||||
json() = 0;
|
||||
|
||||
/** Returns a sequence representing the current list of peers.
|
||||
The snapshot is made at the time of the call.
|
||||
*/
|
||||
virtual PeerSequence
|
||||
getActivePeers() const = 0;
|
||||
|
||||
/** Calls the checkTracking function on each peer
|
||||
@param index the value to pass to the peer's checkTracking function
|
||||
*/
|
||||
virtual void
|
||||
checkTracking(std::uint32_t index) = 0;
|
||||
|
||||
/** Returns the peer with the matching short id, or null. */
|
||||
virtual std::shared_ptr<Peer>
|
||||
findPeerByShortID(Peer::id_t const& id) const = 0;
|
||||
|
||||
/** Returns the peer with the matching public key, or null. */
|
||||
virtual std::shared_ptr<Peer>
|
||||
findPeerByPublicKey(PublicKey const& pubKey) = 0;
|
||||
|
||||
/** Broadcast a proposal. */
|
||||
virtual void
|
||||
broadcast(protocol::TMProposeSet& m) = 0;
|
||||
|
||||
/** Broadcast a validation. */
|
||||
virtual void
|
||||
broadcast(protocol::TMValidation& m) = 0;
|
||||
|
||||
/** Relay a proposal.
|
||||
* @param m the serialized proposal
|
||||
* @param uid the id used to identify this proposal
|
||||
* @param validator The pubkey of the validator that issued this proposal
|
||||
* @return the set of peers which have already sent us this proposal
|
||||
*/
|
||||
virtual std::set<Peer::id_t>
|
||||
relay(
|
||||
protocol::TMProposeSet& m,
|
||||
uint256 const& uid,
|
||||
PublicKey const& validator) = 0;
|
||||
|
||||
/** Relay a validation.
|
||||
* @param m the serialized validation
|
||||
* @param uid the id used to identify this validation
|
||||
* @param validator The pubkey of the validator that issued this validation
|
||||
* @return the set of peers which have already sent us this validation
|
||||
*/
|
||||
virtual std::set<Peer::id_t>
|
||||
relay(
|
||||
protocol::TMValidation& m,
|
||||
uint256 const& uid,
|
||||
PublicKey const& validator) = 0;
|
||||
|
||||
/** Relay a transaction. If the tx reduce-relay feature is enabled then
|
||||
* randomly select peers to relay to and queue transaction's hash
|
||||
* for the rest of the peers.
|
||||
* @param hash transaction's hash
|
||||
* @param m transaction's protocol message to relay
|
||||
* @param toSkip peers which have already seen this transaction
|
||||
*/
|
||||
virtual void
|
||||
relay(
|
||||
uint256 const& hash,
|
||||
std::optional<std::reference_wrapper<protocol::TMTransaction>> m,
|
||||
std::set<Peer::id_t> const& toSkip) = 0;
|
||||
|
||||
/** Visit every active peer.
|
||||
*
|
||||
* The visitor must be invocable as:
|
||||
* Function(std::shared_ptr<Peer> const& peer);
|
||||
*
|
||||
* @param f the invocable to call with every peer
|
||||
*/
|
||||
template <class Function>
|
||||
void
|
||||
foreach(Function f) const
|
||||
{
|
||||
for (auto const& p : getActivePeers())
|
||||
f(p);
|
||||
}
|
||||
|
||||
/** Increment and retrieve counter for transaction job queue overflows. */
|
||||
virtual void
|
||||
incJqTransOverflow() = 0;
|
||||
virtual std::uint64_t
|
||||
getJqTransOverflow() const = 0;
|
||||
|
||||
/** Increment and retrieve counters for total peer disconnects, and
|
||||
* disconnects we initiate for excessive resource consumption.
|
||||
*/
|
||||
virtual void
|
||||
incPeerDisconnect() = 0;
|
||||
virtual std::uint64_t
|
||||
getPeerDisconnect() const = 0;
|
||||
virtual void
|
||||
incPeerDisconnectCharges() = 0;
|
||||
virtual std::uint64_t
|
||||
getPeerDisconnectCharges() const = 0;
|
||||
|
||||
/** Returns the ID of the network this server is configured for, if any.
|
||||
|
||||
The ID is just a numerical identifier, with the IDs 0, 1 and 2 used to
|
||||
identify the mainnet, the testnet and the devnet respectively.
|
||||
|
||||
@return The numerical identifier configured by the administrator of the
|
||||
server. An unseated optional, otherwise.
|
||||
*/
|
||||
virtual std::optional<std::uint32_t>
|
||||
networkID() const = 0;
|
||||
|
||||
/** Returns tx reduce-relay metrics
|
||||
@return json value of tx reduce-relay metrics
|
||||
*/
|
||||
virtual Json::Value
|
||||
txMetrics() const = 0;
|
||||
};
|
||||
|
||||
} // namespace xrpl
|
||||
|
||||
#endif
|
||||
123
include/xrpl/overlay/Peer.h
Normal file
123
include/xrpl/overlay/Peer.h
Normal file
@@ -0,0 +1,123 @@
|
||||
#ifndef XRPL_OVERLAY_PEER_H_INCLUDED
|
||||
#define XRPL_OVERLAY_PEER_H_INCLUDED
|
||||
|
||||
#include <xrpl/basics/base_uint.h>
|
||||
#include <xrpl/beast/net/IPEndpoint.h>
|
||||
#include <xrpl/json/json_value.h>
|
||||
#include <xrpl/overlay/Message.h>
|
||||
#include <xrpl/protocol/PublicKey.h>
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
namespace Resource {
|
||||
class Charge;
|
||||
}
|
||||
|
||||
enum class ProtocolFeature {
|
||||
ValidatorListPropagation,
|
||||
ValidatorList2Propagation,
|
||||
LedgerReplay,
|
||||
};
|
||||
|
||||
/** Represents a peer connection in the overlay. */
|
||||
class Peer
|
||||
{
|
||||
public:
|
||||
using ptr = std::shared_ptr<Peer>;
|
||||
|
||||
/** Uniquely identifies a peer.
|
||||
This can be stored in tables to find the peer later. Callers
|
||||
can discover if the peer is no longer connected and make
|
||||
adjustments as needed.
|
||||
*/
|
||||
using id_t = std::uint32_t;
|
||||
|
||||
virtual ~Peer() = default;
|
||||
|
||||
//
|
||||
// Network
|
||||
//
|
||||
|
||||
virtual void
|
||||
send(std::shared_ptr<Message> const& m) = 0;
|
||||
|
||||
virtual beast::IP::Endpoint
|
||||
getRemoteAddress() const = 0;
|
||||
|
||||
/** Send aggregated transactions' hashes. */
|
||||
virtual void
|
||||
sendTxQueue() = 0;
|
||||
|
||||
/** Aggregate transaction's hash. */
|
||||
virtual void
|
||||
addTxQueue(uint256 const&) = 0;
|
||||
|
||||
/** Remove hash from the transactions' hashes queue. */
|
||||
virtual void
|
||||
removeTxQueue(uint256 const&) = 0;
|
||||
|
||||
/** Adjust this peer's load balance based on the type of load imposed. */
|
||||
virtual void
|
||||
charge(Resource::Charge const& fee, std::string const& context) = 0;
|
||||
|
||||
//
|
||||
// Identity
|
||||
//
|
||||
|
||||
virtual id_t
|
||||
id() const = 0;
|
||||
|
||||
/** Returns `true` if this connection is a member of the cluster. */
|
||||
virtual bool
|
||||
cluster() const = 0;
|
||||
|
||||
virtual bool
|
||||
isHighLatency() const = 0;
|
||||
|
||||
virtual int
|
||||
getScore(bool) const = 0;
|
||||
|
||||
virtual PublicKey const&
|
||||
getNodePublic() const = 0;
|
||||
|
||||
virtual Json::Value
|
||||
json() = 0;
|
||||
|
||||
virtual bool
|
||||
supportsFeature(ProtocolFeature f) const = 0;
|
||||
|
||||
virtual std::optional<std::size_t>
|
||||
publisherListSequence(PublicKey const&) const = 0;
|
||||
|
||||
virtual void
|
||||
setPublisherListSequence(PublicKey const&, std::size_t const) = 0;
|
||||
|
||||
virtual std::string const&
|
||||
fingerprint() const = 0;
|
||||
//
|
||||
// Ledger
|
||||
//
|
||||
|
||||
virtual uint256 const&
|
||||
getClosedLedgerHash() const = 0;
|
||||
virtual bool
|
||||
hasLedger(uint256 const& hash, std::uint32_t seq) const = 0;
|
||||
virtual void
|
||||
ledgerRange(std::uint32_t& minSeq, std::uint32_t& maxSeq) const = 0;
|
||||
virtual bool
|
||||
hasTxSet(uint256 const& hash) const = 0;
|
||||
virtual void
|
||||
cycleStatus() = 0;
|
||||
virtual bool
|
||||
hasRange(std::uint32_t uMin, std::uint32_t uMax) = 0;
|
||||
|
||||
virtual bool
|
||||
compressionEnabled() const = 0;
|
||||
|
||||
virtual bool
|
||||
txReduceRelayEnabled() const = 0;
|
||||
};
|
||||
|
||||
} // namespace xrpl
|
||||
|
||||
#endif
|
||||
68
include/xrpl/overlay/PeerSet.h
Normal file
68
include/xrpl/overlay/PeerSet.h
Normal file
@@ -0,0 +1,68 @@
|
||||
#ifndef XRPL_APP_PEERS_PEERSET_H_INCLUDED
|
||||
#define XRPL_APP_PEERS_PEERSET_H_INCLUDED
|
||||
|
||||
#include <xrpld/app/main/Application.h>
|
||||
#include <xrpld/overlay/detail/ProtocolMessage.h>
|
||||
|
||||
#include <xrpl/overlay/Peer.h>
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
/** Supports data retrieval by managing a set of peers.
|
||||
|
||||
When desired data (such as a ledger or a transaction set)
|
||||
is missing locally it can be obtained by querying connected
|
||||
peers. This class manages common aspects of the retrieval.
|
||||
Callers maintain the set by adding and removing peers depending
|
||||
on whether the peers have useful information.
|
||||
|
||||
The data is represented by its hash.
|
||||
*/
|
||||
class PeerSet
|
||||
{
|
||||
public:
|
||||
virtual ~PeerSet() = default;
|
||||
|
||||
/**
|
||||
* Try add more peers
|
||||
* @param limit number of peers to add
|
||||
* @param hasItem callback that helps to select peers
|
||||
* @param onPeerAdded callback called when a peer is added
|
||||
*/
|
||||
virtual void
|
||||
addPeers(
|
||||
std::size_t limit,
|
||||
std::function<bool(std::shared_ptr<Peer> const&)> hasItem,
|
||||
std::function<void(std::shared_ptr<Peer> const&)> onPeerAdded) = 0;
|
||||
|
||||
/** send a message */
|
||||
template <typename MessageType>
|
||||
void
|
||||
sendRequest(MessageType const& message, std::shared_ptr<Peer> const& peer)
|
||||
{
|
||||
this->sendRequest(message, protocolMessageType(message), peer);
|
||||
}
|
||||
|
||||
virtual void
|
||||
sendRequest(
|
||||
::google::protobuf::Message const& message,
|
||||
protocol::MessageType type,
|
||||
std::shared_ptr<Peer> const& peer) = 0;
|
||||
|
||||
/** get the set of ids of previously added peers */
|
||||
virtual std::set<Peer::id_t> const&
|
||||
getPeerIds() const = 0;
|
||||
};
|
||||
|
||||
class PeerSetBuilder
|
||||
{
|
||||
public:
|
||||
virtual ~PeerSetBuilder() = default;
|
||||
|
||||
virtual std::unique_ptr<PeerSet>
|
||||
build() = 0;
|
||||
};
|
||||
|
||||
} // namespace xrpl
|
||||
|
||||
#endif
|
||||
359
include/xrpl/overlay/detail/TrafficCount.h
Normal file
359
include/xrpl/overlay/detail/TrafficCount.h
Normal file
@@ -0,0 +1,359 @@
|
||||
#ifndef XRPL_OVERLAY_TRAFFIC_H_INCLUDED
|
||||
#define XRPL_OVERLAY_TRAFFIC_H_INCLUDED
|
||||
|
||||
#include <xrpl/beast/utility/instrumentation.h>
|
||||
#include <xrpl/protocol/messages.h>
|
||||
|
||||
#include <atomic>
|
||||
#include <cstdint>
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
/**
|
||||
TrafficCount is used to count ingress and egress wire bytes and number of
|
||||
messages. The general intended usage is as follows:
|
||||
1. Determine the message category by callin TrafficCount::categorize
|
||||
2. Increment the counters for incoming or outgoing traffic by calling
|
||||
TrafficCount::addCount
|
||||
3. Optionally, TrafficCount::addCount can be called at any time to
|
||||
increment additional traffic categories, not captured by
|
||||
TrafficCount::categorize.
|
||||
|
||||
There are two special categories:
|
||||
1. category::total - this category is used to report the total traffic
|
||||
amount. It should be incremented once just after receiving a new message, and
|
||||
once just before sending a message to a peer. Messages whose category is not
|
||||
in TrafficCount::categorize are not included in the total.
|
||||
2. category::unknown - this category is used to report traffic for
|
||||
messages of unknown type.
|
||||
*/
|
||||
class TrafficCount
|
||||
{
|
||||
public:
|
||||
enum category : std::size_t;
|
||||
|
||||
class TrafficStats
|
||||
{
|
||||
public:
|
||||
std::string name;
|
||||
|
||||
std::atomic<std::uint64_t> bytesIn{0};
|
||||
std::atomic<std::uint64_t> bytesOut{0};
|
||||
std::atomic<std::uint64_t> messagesIn{0};
|
||||
std::atomic<std::uint64_t> messagesOut{0};
|
||||
|
||||
TrafficStats(TrafficCount::category cat)
|
||||
: name(TrafficCount::to_string(cat))
|
||||
{
|
||||
}
|
||||
|
||||
TrafficStats(TrafficStats const& ts)
|
||||
: name(ts.name)
|
||||
, bytesIn(ts.bytesIn.load())
|
||||
, bytesOut(ts.bytesOut.load())
|
||||
, messagesIn(ts.messagesIn.load())
|
||||
, messagesOut(ts.messagesOut.load())
|
||||
{
|
||||
}
|
||||
|
||||
operator bool() const
|
||||
{
|
||||
return messagesIn || messagesOut;
|
||||
}
|
||||
};
|
||||
|
||||
// If you add entries to this enum, you need to update the initialization
|
||||
// of the arrays at the bottom of this file which map array numbers to
|
||||
// human-readable, monitoring-tool friendly names.
|
||||
enum category : std::size_t {
|
||||
base, // basic peer overhead, must be first
|
||||
|
||||
cluster, // cluster overhead
|
||||
overlay, // overlay management
|
||||
manifests, // manifest management
|
||||
|
||||
transaction, // transaction messages
|
||||
// The following categories breakdown transaction message type
|
||||
transaction_duplicate, // duplicate transaction messages
|
||||
|
||||
proposal, // proposal messages
|
||||
// The following categories breakdown proposal message type
|
||||
proposal_untrusted, // proposals from untrusted validators
|
||||
proposal_duplicate, // proposals seen previously
|
||||
|
||||
validation, // validation messages
|
||||
// The following categories breakdown validation message type
|
||||
validation_untrusted, // validations from untrusted validators
|
||||
validation_duplicate, // validations seen previously
|
||||
|
||||
validatorlist,
|
||||
|
||||
squelch,
|
||||
squelch_suppressed, // egress traffic amount suppressed by squelching
|
||||
squelch_ignored, // the traffic amount that came from peers ignoring
|
||||
// squelch messages
|
||||
|
||||
// TMHaveSet message:
|
||||
get_set, // transaction sets we try to get
|
||||
share_set, // transaction sets we get
|
||||
|
||||
// TMLedgerData: transaction set candidate
|
||||
ld_tsc_get,
|
||||
ld_tsc_share,
|
||||
|
||||
// TMLedgerData: transaction node
|
||||
ld_txn_get,
|
||||
ld_txn_share,
|
||||
|
||||
// TMLedgerData: account state node
|
||||
ld_asn_get,
|
||||
ld_asn_share,
|
||||
|
||||
// TMLedgerData: generic
|
||||
ld_get,
|
||||
ld_share,
|
||||
|
||||
// TMGetLedger: transaction set candidate
|
||||
gl_tsc_share,
|
||||
gl_tsc_get,
|
||||
|
||||
// TMGetLedger: transaction node
|
||||
gl_txn_share,
|
||||
gl_txn_get,
|
||||
|
||||
// TMGetLedger: account state node
|
||||
gl_asn_share,
|
||||
gl_asn_get,
|
||||
|
||||
// TMGetLedger: generic
|
||||
gl_share,
|
||||
gl_get,
|
||||
|
||||
// TMGetObjectByHash:
|
||||
share_hash_ledger,
|
||||
get_hash_ledger,
|
||||
|
||||
// TMGetObjectByHash:
|
||||
share_hash_tx,
|
||||
get_hash_tx,
|
||||
|
||||
// TMGetObjectByHash: transaction node
|
||||
share_hash_txnode,
|
||||
get_hash_txnode,
|
||||
|
||||
// TMGetObjectByHash: account state node
|
||||
share_hash_asnode,
|
||||
get_hash_asnode,
|
||||
|
||||
// TMGetObjectByHash: CAS
|
||||
share_cas_object,
|
||||
get_cas_object,
|
||||
|
||||
// TMGetObjectByHash: fetch packs
|
||||
share_fetch_pack,
|
||||
get_fetch_pack,
|
||||
|
||||
// TMGetObjectByHash: transactions
|
||||
get_transactions,
|
||||
|
||||
// TMGetObjectByHash: generic
|
||||
share_hash,
|
||||
get_hash,
|
||||
|
||||
// TMProofPathRequest and TMProofPathResponse
|
||||
proof_path_request,
|
||||
proof_path_response,
|
||||
|
||||
// TMReplayDeltaRequest and TMReplayDeltaResponse
|
||||
replay_delta_request,
|
||||
replay_delta_response,
|
||||
|
||||
// TMHaveTransactions
|
||||
have_transactions,
|
||||
|
||||
// TMTransactions
|
||||
requested_transactions,
|
||||
|
||||
// The total p2p bytes sent and received on the wire
|
||||
total,
|
||||
|
||||
unknown // must be last
|
||||
};
|
||||
|
||||
TrafficCount() = default;
|
||||
|
||||
/** Given a protocol message, determine which traffic category it belongs to
|
||||
*/
|
||||
static category
|
||||
categorize(
|
||||
::google::protobuf::Message const& message,
|
||||
protocol::MessageType type,
|
||||
bool inbound);
|
||||
|
||||
/** Account for traffic associated with the given category */
|
||||
void
|
||||
addCount(category cat, bool inbound, int bytes)
|
||||
{
|
||||
XRPL_ASSERT(
|
||||
cat <= category::unknown,
|
||||
"xrpl::TrafficCount::addCount : valid category input");
|
||||
|
||||
auto it = counts_.find(cat);
|
||||
|
||||
// nothing to do, the category does not exist
|
||||
if (it == counts_.end())
|
||||
return;
|
||||
|
||||
if (inbound)
|
||||
{
|
||||
it->second.bytesIn += bytes;
|
||||
++it->second.messagesIn;
|
||||
}
|
||||
else
|
||||
{
|
||||
it->second.bytesOut += bytes;
|
||||
++it->second.messagesOut;
|
||||
}
|
||||
}
|
||||
|
||||
/** An up-to-date copy of all the counters
|
||||
|
||||
@return an object which satisfies the requirements of Container
|
||||
*/
|
||||
auto const&
|
||||
getCounts() const
|
||||
{
|
||||
return counts_;
|
||||
}
|
||||
|
||||
static std::string
|
||||
to_string(category cat)
|
||||
{
|
||||
static std::unordered_map<category, std::string> const category_map = {
|
||||
{base, "overhead"},
|
||||
{cluster, "overhead_cluster"},
|
||||
{overlay, "overhead_overlay"},
|
||||
{manifests, "overhead_manifest"},
|
||||
{transaction, "transactions"},
|
||||
{transaction_duplicate, "transactions_duplicate"},
|
||||
{proposal, "proposals"},
|
||||
{proposal_untrusted, "proposals_untrusted"},
|
||||
{proposal_duplicate, "proposals_duplicate"},
|
||||
{validation, "validations"},
|
||||
{validation_untrusted, "validations_untrusted"},
|
||||
{validation_duplicate, "validations_duplicate"},
|
||||
{validatorlist, "validator_lists"},
|
||||
{squelch, "squelch"},
|
||||
{squelch_suppressed, "squelch_suppressed"},
|
||||
{squelch_ignored, "squelch_ignored"},
|
||||
{get_set, "set_get"},
|
||||
{share_set, "set_share"},
|
||||
{ld_tsc_get, "ledger_data_Transaction_Set_candidate_get"},
|
||||
{ld_tsc_share, "ledger_data_Transaction_Set_candidate_share"},
|
||||
{ld_txn_get, "ledger_data_Transaction_Node_get"},
|
||||
{ld_txn_share, "ledger_data_Transaction_Node_share"},
|
||||
{ld_asn_get, "ledger_data_Account_State_Node_get"},
|
||||
{ld_asn_share, "ledger_data_Account_State_Node_share"},
|
||||
{ld_get, "ledger_data_get"},
|
||||
{ld_share, "ledger_data_share"},
|
||||
{gl_tsc_share, "ledger_Transaction_Set_candidate_share"},
|
||||
{gl_tsc_get, "ledger_Transaction_Set_candidate_get"},
|
||||
{gl_txn_share, "ledger_Transaction_node_share"},
|
||||
{gl_txn_get, "ledger_Transaction_node_get"},
|
||||
{gl_asn_share, "ledger_Account_State_node_share"},
|
||||
{gl_asn_get, "ledger_Account_State_node_get"},
|
||||
{gl_share, "ledger_share"},
|
||||
{gl_get, "ledger_get"},
|
||||
{share_hash_ledger, "getobject_Ledger_share"},
|
||||
{get_hash_ledger, "getobject_Ledger_get"},
|
||||
{share_hash_tx, "getobject_Transaction_share"},
|
||||
{get_hash_tx, "getobject_Transaction_get"},
|
||||
{share_hash_txnode, "getobject_Transaction_node_share"},
|
||||
{get_hash_txnode, "getobject_Transaction_node_get"},
|
||||
{share_hash_asnode, "getobject_Account_State_node_share"},
|
||||
{get_hash_asnode, "getobject_Account_State_node_get"},
|
||||
{share_cas_object, "getobject_CAS_share"},
|
||||
{get_cas_object, "getobject_CAS_get"},
|
||||
{share_fetch_pack, "getobject_Fetch_Pack_share"},
|
||||
{get_fetch_pack, "getobject_Fetch Pack_get"},
|
||||
{get_transactions, "getobject_Transactions_get"},
|
||||
{share_hash, "getobject_share"},
|
||||
{get_hash, "getobject_get"},
|
||||
{proof_path_request, "proof_path_request"},
|
||||
{proof_path_response, "proof_path_response"},
|
||||
{replay_delta_request, "replay_delta_request"},
|
||||
{replay_delta_response, "replay_delta_response"},
|
||||
{have_transactions, "have_transactions"},
|
||||
{requested_transactions, "requested_transactions"},
|
||||
{total, "total"}};
|
||||
|
||||
if (auto it = category_map.find(cat); it != category_map.end())
|
||||
return it->second;
|
||||
|
||||
return "unknown";
|
||||
}
|
||||
|
||||
protected:
|
||||
std::unordered_map<category, TrafficStats> counts_{
|
||||
{base, {base}},
|
||||
{cluster, {cluster}},
|
||||
{overlay, {overlay}},
|
||||
{manifests, {manifests}},
|
||||
{transaction, {transaction}},
|
||||
{transaction_duplicate, {transaction_duplicate}},
|
||||
{proposal, {proposal}},
|
||||
{proposal_untrusted, {proposal_untrusted}},
|
||||
{proposal_duplicate, {proposal_duplicate}},
|
||||
{validation, {validation}},
|
||||
{validation_untrusted, {validation_untrusted}},
|
||||
{validation_duplicate, {validation_duplicate}},
|
||||
{validatorlist, {validatorlist}},
|
||||
{squelch, {squelch}},
|
||||
{squelch_suppressed, {squelch_suppressed}},
|
||||
{squelch_ignored, {squelch_ignored}},
|
||||
{get_set, {get_set}},
|
||||
{share_set, {share_set}},
|
||||
{ld_tsc_get, {ld_tsc_get}},
|
||||
{ld_tsc_share, {ld_tsc_share}},
|
||||
{ld_txn_get, {ld_txn_get}},
|
||||
{ld_txn_share, {ld_txn_share}},
|
||||
{ld_asn_get, {ld_asn_get}},
|
||||
{ld_asn_share, {ld_asn_share}},
|
||||
{ld_get, {ld_get}},
|
||||
{ld_share, {ld_share}},
|
||||
{gl_tsc_share, {gl_tsc_share}},
|
||||
{gl_tsc_get, {gl_tsc_get}},
|
||||
{gl_txn_share, {gl_txn_share}},
|
||||
{gl_txn_get, {gl_txn_get}},
|
||||
{gl_asn_share, {gl_asn_share}},
|
||||
{gl_asn_get, {gl_asn_get}},
|
||||
{gl_share, {gl_share}},
|
||||
{gl_get, {gl_get}},
|
||||
{share_hash_ledger, {share_hash_ledger}},
|
||||
{get_hash_ledger, {get_hash_ledger}},
|
||||
{share_hash_tx, {share_hash_tx}},
|
||||
{get_hash_tx, {get_hash_tx}},
|
||||
{share_hash_txnode, {share_hash_txnode}},
|
||||
{get_hash_txnode, {get_hash_txnode}},
|
||||
{share_hash_asnode, {share_hash_asnode}},
|
||||
{get_hash_asnode, {get_hash_asnode}},
|
||||
{share_cas_object, {share_cas_object}},
|
||||
{get_cas_object, {get_cas_object}},
|
||||
{share_fetch_pack, {share_fetch_pack}},
|
||||
{get_fetch_pack, {get_fetch_pack}},
|
||||
{get_transactions, {get_transactions}},
|
||||
{share_hash, {share_hash}},
|
||||
{get_hash, {get_hash}},
|
||||
{proof_path_request, {proof_path_request}},
|
||||
{proof_path_response, {proof_path_response}},
|
||||
{replay_delta_request, {replay_delta_request}},
|
||||
{replay_delta_response, {replay_delta_response}},
|
||||
{have_transactions, {have_transactions}},
|
||||
{requested_transactions, {requested_transactions}},
|
||||
{total, {total}},
|
||||
{unknown, {unknown}},
|
||||
};
|
||||
};
|
||||
|
||||
} // namespace xrpl
|
||||
#endif
|
||||
Reference in New Issue
Block a user