mirror of
https://github.com/XRPLF/rippled.git
synced 2025-12-06 17:27:55 +00:00
Add metrics for PeerImp to track bandwidth usage
This commit is contained in:
committed by
Manoj doshi
parent
9196d9541a
commit
f4d6b0e1c4
@@ -45,6 +45,7 @@
|
||||
#include <algorithm>
|
||||
#include <memory>
|
||||
#include <sstream>
|
||||
#include <numeric>
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
@@ -375,6 +376,12 @@ PeerImp::json()
|
||||
}
|
||||
}
|
||||
|
||||
ret[jss::metrics] = Json::Value(Json::objectValue);
|
||||
ret[jss::metrics][jss::total_bytes_recv] = std::to_string(metrics_.recv.total_bytes());
|
||||
ret[jss::metrics][jss::total_bytes_sent] = std::to_string(metrics_.sent.total_bytes());
|
||||
ret[jss::metrics][jss::avg_bps_recv] = std::to_string(metrics_.recv.average_bytes());
|
||||
ret[jss::metrics][jss::avg_bps_sent] = std::to_string(metrics_.sent.average_bytes());
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@@ -846,6 +853,8 @@ PeerImp::onReadMessage (error_code ec, std::size_t bytes_transferred)
|
||||
stream << "onReadMessage";
|
||||
}
|
||||
|
||||
metrics_.recv.add_message(bytes_transferred);
|
||||
|
||||
read_buffer_.commit (bytes_transferred);
|
||||
|
||||
while (read_buffer_.size() > 0)
|
||||
@@ -893,6 +902,8 @@ PeerImp::onWriteMessage (error_code ec, std::size_t bytes_transferred)
|
||||
stream << "onWriteMessage";
|
||||
}
|
||||
|
||||
metrics_.sent.add_message(bytes_transferred);
|
||||
|
||||
assert(! send_queue_.empty());
|
||||
send_queue_.pop();
|
||||
if (! send_queue_.empty())
|
||||
@@ -2851,4 +2862,41 @@ PeerImp::isHighLatency() const
|
||||
return latency_ >= Tuning::peerHighLatency;
|
||||
}
|
||||
|
||||
void
|
||||
PeerImp::Metrics::add_message(std::uint64_t bytes)
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
std::unique_lock lock{ mutex_ };
|
||||
|
||||
totalBytes_ += bytes;
|
||||
accumBytes_ += bytes;
|
||||
auto const timeElapsed = clock_type::now() - intervalStart_;
|
||||
auto const timeElapsedInSecs = std::chrono::duration_cast<std::chrono::seconds>(timeElapsed);
|
||||
|
||||
if (timeElapsedInSecs >= 1s)
|
||||
{
|
||||
auto const avgBytes = accumBytes_ / timeElapsedInSecs.count();
|
||||
rollingAvg_.push_back(avgBytes);
|
||||
|
||||
auto const totalBytes = std::accumulate(rollingAvg_.begin(), rollingAvg_.end(), 0ull);
|
||||
rollingAvgBytes_ = totalBytes / rollingAvg_.size();
|
||||
|
||||
intervalStart_ = clock_type::now();
|
||||
accumBytes_ = 0;
|
||||
}
|
||||
}
|
||||
|
||||
std::uint64_t
|
||||
PeerImp::Metrics::average_bytes() const {
|
||||
std::shared_lock lock{ mutex_ };
|
||||
return rollingAvgBytes_;
|
||||
}
|
||||
|
||||
std::uint64_t
|
||||
PeerImp::Metrics::total_bytes() const {
|
||||
std::shared_lock lock{ mutex_ };
|
||||
return totalBytes_;
|
||||
}
|
||||
|
||||
|
||||
} // ripple
|
||||
|
||||
@@ -33,6 +33,7 @@
|
||||
#include <ripple/protocol/STValidation.h>
|
||||
#include <ripple/resource/Fees.h>
|
||||
|
||||
#include <boost/circular_buffer.hpp>
|
||||
#include <boost/endian/conversion.hpp>
|
||||
#include <boost/optional.hpp>
|
||||
#include <cstdint>
|
||||
@@ -198,6 +199,32 @@ private:
|
||||
|
||||
friend class OverlayImpl;
|
||||
|
||||
class Metrics {
|
||||
public:
|
||||
Metrics() = default;
|
||||
Metrics(Metrics const&) = delete;
|
||||
Metrics& operator=(Metrics const&) = delete;
|
||||
Metrics(Metrics&&) = delete;
|
||||
Metrics& operator=(Metrics&&) = delete;
|
||||
|
||||
void add_message(std::uint64_t bytes);
|
||||
std::uint64_t average_bytes() const;
|
||||
std::uint64_t total_bytes() const;
|
||||
|
||||
private:
|
||||
std::shared_mutex mutable mutex_;
|
||||
boost::circular_buffer<std::uint64_t> rollingAvg_{ 30, 0ull };
|
||||
clock_type::time_point intervalStart_{ clock_type::now() };
|
||||
std::uint64_t totalBytes_{ 0 };
|
||||
std::uint64_t accumBytes_{ 0 };
|
||||
std::uint64_t rollingAvgBytes_{ 0 };
|
||||
};
|
||||
|
||||
struct {
|
||||
Metrics sent;
|
||||
Metrics recv;
|
||||
} metrics_;
|
||||
|
||||
public:
|
||||
PeerImp (PeerImp const&) = delete;
|
||||
PeerImp& operator= (PeerImp const&) = delete;
|
||||
|
||||
@@ -126,6 +126,8 @@ JSS ( authorized ); // out: AccountLines
|
||||
JSS ( auth_change ); // out: AccountInfo
|
||||
JSS ( auth_change_queued ); // out: AccountInfo
|
||||
JSS ( available ); // out: ValidatorList
|
||||
JSS ( avg_bps_recv ); // out: Peers
|
||||
JSS ( avg_bps_sent ); // out: Peers
|
||||
JSS ( balance ); // out: AccountLines
|
||||
JSS ( balances ); // out: GatewayBalances
|
||||
JSS ( base ); // out: LogLevel
|
||||
@@ -338,6 +340,7 @@ JSS ( metaData );
|
||||
JSS ( metadata ); // out: TransactionEntry
|
||||
JSS ( method ); // RPC
|
||||
JSS ( methods );
|
||||
JSS ( metrics ); // out: Peers
|
||||
JSS ( min_count ); // in: GetCounts
|
||||
JSS ( min_ledger ); // in: LedgerCleaner
|
||||
JSS ( minimum_fee ); // out: TxQ
|
||||
@@ -489,6 +492,8 @@ JSS ( timeouts ); // out: InboundLedger
|
||||
JSS ( traffic ); // out: Overlay
|
||||
JSS ( total ); // out: counters
|
||||
JSS ( totalCoins ); // out: LedgerToJson
|
||||
JSS ( total_bytes_recv ); // out: Peers
|
||||
JSS ( total_bytes_sent ); // out: Peers
|
||||
JSS ( total_coins ); // out: LedgerToJson
|
||||
JSS ( transTreeHash ); // out: ledger/Ledger.cpp
|
||||
JSS ( transaction ); // in: Tx
|
||||
|
||||
Reference in New Issue
Block a user