diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 88a231e8f..762dc6a69 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -45,6 +45,7 @@ #include #include #include +#include using namespace std::chrono_literals; @@ -375,6 +376,12 @@ PeerImp::json() } } + ret[jss::metrics] = Json::Value(Json::objectValue); + ret[jss::metrics][jss::total_bytes_recv] = std::to_string(metrics_.recv.total_bytes()); + ret[jss::metrics][jss::total_bytes_sent] = std::to_string(metrics_.sent.total_bytes()); + ret[jss::metrics][jss::avg_bps_recv] = std::to_string(metrics_.recv.average_bytes()); + ret[jss::metrics][jss::avg_bps_sent] = std::to_string(metrics_.sent.average_bytes()); + return ret; } @@ -846,6 +853,8 @@ PeerImp::onReadMessage (error_code ec, std::size_t bytes_transferred) stream << "onReadMessage"; } + metrics_.recv.add_message(bytes_transferred); + read_buffer_.commit (bytes_transferred); while (read_buffer_.size() > 0) @@ -893,6 +902,8 @@ PeerImp::onWriteMessage (error_code ec, std::size_t bytes_transferred) stream << "onWriteMessage"; } + metrics_.sent.add_message(bytes_transferred); + assert(! send_queue_.empty()); send_queue_.pop(); if (! send_queue_.empty()) @@ -2851,4 +2862,41 @@ PeerImp::isHighLatency() const return latency_ >= Tuning::peerHighLatency; } +void +PeerImp::Metrics::add_message(std::uint64_t bytes) +{ + using namespace std::chrono_literals; + std::unique_lock lock{ mutex_ }; + + totalBytes_ += bytes; + accumBytes_ += bytes; + auto const timeElapsed = clock_type::now() - intervalStart_; + auto const timeElapsedInSecs = std::chrono::duration_cast(timeElapsed); + + if (timeElapsedInSecs >= 1s) + { + auto const avgBytes = accumBytes_ / timeElapsedInSecs.count(); + rollingAvg_.push_back(avgBytes); + + auto const totalBytes = std::accumulate(rollingAvg_.begin(), rollingAvg_.end(), 0ull); + rollingAvgBytes_ = totalBytes / rollingAvg_.size(); + + intervalStart_ = clock_type::now(); + accumBytes_ = 0; + } +} + +std::uint64_t +PeerImp::Metrics::average_bytes() const { + std::shared_lock lock{ mutex_ }; + return rollingAvgBytes_; +} + +std::uint64_t +PeerImp::Metrics::total_bytes() const { + std::shared_lock lock{ mutex_ }; + return totalBytes_; +} + + } // ripple diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index 441f6c1db..070ab8d06 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -33,6 +33,7 @@ #include #include +#include #include #include #include @@ -198,6 +199,32 @@ private: friend class OverlayImpl; + class Metrics { + public: + Metrics() = default; + Metrics(Metrics const&) = delete; + Metrics& operator=(Metrics const&) = delete; + Metrics(Metrics&&) = delete; + Metrics& operator=(Metrics&&) = delete; + + void add_message(std::uint64_t bytes); + std::uint64_t average_bytes() const; + std::uint64_t total_bytes() const; + + private: + std::shared_mutex mutable mutex_; + boost::circular_buffer rollingAvg_{ 30, 0ull }; + clock_type::time_point intervalStart_{ clock_type::now() }; + std::uint64_t totalBytes_{ 0 }; + std::uint64_t accumBytes_{ 0 }; + std::uint64_t rollingAvgBytes_{ 0 }; + }; + + struct { + Metrics sent; + Metrics recv; + } metrics_; + public: PeerImp (PeerImp const&) = delete; PeerImp& operator= (PeerImp const&) = delete; diff --git a/src/ripple/protocol/jss.h b/src/ripple/protocol/jss.h index 1d9529375..9b79a904d 100644 --- a/src/ripple/protocol/jss.h +++ b/src/ripple/protocol/jss.h @@ -126,6 +126,8 @@ JSS ( authorized ); // out: AccountLines JSS ( auth_change ); // out: AccountInfo JSS ( auth_change_queued ); // out: AccountInfo JSS ( available ); // out: ValidatorList +JSS ( avg_bps_recv ); // out: Peers +JSS ( avg_bps_sent ); // out: Peers JSS ( balance ); // out: AccountLines JSS ( balances ); // out: GatewayBalances JSS ( base ); // out: LogLevel @@ -338,6 +340,7 @@ JSS ( metaData ); JSS ( metadata ); // out: TransactionEntry JSS ( method ); // RPC JSS ( methods ); +JSS ( metrics ); // out: Peers JSS ( min_count ); // in: GetCounts JSS ( min_ledger ); // in: LedgerCleaner JSS ( minimum_fee ); // out: TxQ @@ -489,6 +492,8 @@ JSS ( timeouts ); // out: InboundLedger JSS ( traffic ); // out: Overlay JSS ( total ); // out: counters JSS ( totalCoins ); // out: LedgerToJson +JSS ( total_bytes_recv ); // out: Peers +JSS ( total_bytes_sent ); // out: Peers JSS ( total_coins ); // out: LedgerToJson JSS ( transTreeHash ); // out: ledger/Ledger.cpp JSS ( transaction ); // in: Tx