From 61e5359231b304bd7c950c8e17e7092872df7737 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Mon, 28 Sep 2015 16:05:05 -0700 Subject: [PATCH] Track peer traffic by category --- Builds/VisualStudio2015/RippleD.vcxproj | 6 + .../VisualStudio2015/RippleD.vcxproj.filters | 6 + src/ripple/overlay/Message.h | 11 +- src/ripple/overlay/impl/Message.cpp | 4 + src/ripple/overlay/impl/OverlayImpl.cpp | 32 +++++ src/ripple/overlay/impl/OverlayImpl.h | 8 ++ src/ripple/overlay/impl/PeerImp.cpp | 9 +- src/ripple/overlay/impl/PeerImp.h | 3 +- src/ripple/overlay/impl/ProtocolMessage.h | 3 +- src/ripple/overlay/impl/TrafficCount.cpp | 135 ++++++++++++++++++ src/ripple/overlay/impl/TrafficCount.h | 124 ++++++++++++++++ src/ripple/protocol/JsonFields.h | 3 +- src/ripple/unity/overlay.cpp | 1 + 13 files changed, 339 insertions(+), 6 deletions(-) create mode 100644 src/ripple/overlay/impl/TrafficCount.cpp create mode 100644 src/ripple/overlay/impl/TrafficCount.h diff --git a/Builds/VisualStudio2015/RippleD.vcxproj b/Builds/VisualStudio2015/RippleD.vcxproj index 18e06abf93..739a4adf19 100644 --- a/Builds/VisualStudio2015/RippleD.vcxproj +++ b/Builds/VisualStudio2015/RippleD.vcxproj @@ -2624,6 +2624,12 @@ + + True + True + + + diff --git a/Builds/VisualStudio2015/RippleD.vcxproj.filters b/Builds/VisualStudio2015/RippleD.vcxproj.filters index 5badef5378..a3941b5ea1 100644 --- a/Builds/VisualStudio2015/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2015/RippleD.vcxproj.filters @@ -3309,6 +3309,12 @@ ripple\overlay\impl + + ripple\overlay\impl + + + ripple\overlay\impl + ripple\overlay\impl diff --git a/src/ripple/overlay/Message.h b/src/ripple/overlay/Message.h index bf1550848c..b662ce0f76 100644 --- a/src/ripple/overlay/Message.h +++ b/src/ripple/overlay/Message.h @@ -63,6 +63,13 @@ public: return mBuffer; } + /** Get the traffic category */ + int + getCategory () const + { + return mCategory; + } + /** Determine bytewise equality. */ bool operator == (Message const& other) const; @@ -148,10 +155,10 @@ private: void encodeHeader (unsigned size, int type); std::vector mBuffer; + + int mCategory; }; } #endif - - diff --git a/src/ripple/overlay/impl/Message.cpp b/src/ripple/overlay/impl/Message.cpp index 947bc5bc74..f2eb397ec7 100644 --- a/src/ripple/overlay/impl/Message.cpp +++ b/src/ripple/overlay/impl/Message.cpp @@ -19,6 +19,7 @@ #include #include +#include #include namespace ripple { @@ -37,6 +38,9 @@ Message::Message (::google::protobuf::Message const& message, int type) { message.SerializeToArray (&mBuffer [Message::kHeaderBytes], messageBytes); } + + mCategory = static_cast(TrafficCount::categorize + (message, type, false)); } bool Message::operator== (Message const& other) const diff --git a/src/ripple/overlay/impl/OverlayImpl.cpp b/src/ripple/overlay/impl/OverlayImpl.cpp index 75c7076fc3..3eb269e095 100644 --- a/src/ripple/overlay/impl/OverlayImpl.cpp +++ b/src/ripple/overlay/impl/OverlayImpl.cpp @@ -32,6 +32,7 @@ #include #include #include +#include #include #include #include @@ -588,6 +589,28 @@ OverlayImpl::onChildrenStopped () void OverlayImpl::onWrite (beast::PropertyStream::Map& stream) { + beast::PropertyStream::Set set ("traffic", stream); + auto stats = m_traffic.getCounts(); + for (auto& i : stats) + { + if (! i.second.messagesIn && ! i.second.messagesOut) + continue; + + beast::PropertyStream::Map item (set); + item["category"] = i.first; + item["bytes_in"] = + beast::lexicalCast + (i.second.bytesIn.load()); + item["messages_in"] = + beast::lexicalCast + (i.second.messagesIn.load()); + item["bytes_out"] = + beast::lexicalCast + (i.second.bytesOut.load()); + item["messages_out"] = + beast::lexicalCast + (i.second.messagesOut.load()); + } } //------------------------------------------------------------------------------ @@ -711,6 +734,15 @@ OverlayImpl::onManifests ( } } +void +OverlayImpl::reportTraffic ( + TrafficCount::category cat, + bool isInbound, + int number) +{ + m_traffic.addCount (cat, isInbound, number); +} + std::size_t OverlayImpl::selectPeers (PeerSet& set, std::size_t limit, std::function const&)> score) diff --git a/src/ripple/overlay/impl/OverlayImpl.h b/src/ripple/overlay/impl/OverlayImpl.h index 1084d9b74e..82a4fd0bb4 100644 --- a/src/ripple/overlay/impl/OverlayImpl.h +++ b/src/ripple/overlay/impl/OverlayImpl.h @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -111,6 +112,7 @@ private: ServerHandler& serverHandler_; Resource::Manager& m_resourceManager; std::unique_ptr m_peerFinder; + TrafficCount m_traffic; hash_map > m_peers; hash_map> m_publicKeyMap; @@ -267,6 +269,12 @@ public: std::string makePrefix (std::uint32_t id); + void + reportTraffic ( + TrafficCount::category cat, + bool isInbound, + int bytes); + private: std::shared_ptr makeRedirectResponse (PeerFinder::Slot::ptr const& slot, diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 07bad96aa3..1074a47b8e 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -192,6 +192,10 @@ PeerImp::send (Message::pointer const& m) if(detaching_) return; + overlay_.reportTraffic ( + static_cast(m->getCategory()), + false, static_cast(m->getBuffer().size())); + auto sendq_size = send_queue_.size(); if (sendq_size < Tuning::targetSendQueue) @@ -813,11 +817,14 @@ PeerImp::onMessageUnknown (std::uint16_t type) PeerImp::error_code PeerImp::onMessageBegin (std::uint16_t type, - std::shared_ptr <::google::protobuf::Message> const& m) + std::shared_ptr <::google::protobuf::Message> const& m, + std::size_t size) { load_event_ = app_.getJobQueue ().getLoadEventAP ( jtPEER, protocolMessageName(type)); fee_ = Resource::feeLightPeer; + overlay_.reportTraffic (TrafficCount::categorize (*m, type, true), + true, static_cast(size)); return error_code{}; } diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index 127c480dca..0c14ea25fd 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -403,7 +403,8 @@ public: error_code onMessageBegin (std::uint16_t type, - std::shared_ptr <::google::protobuf::Message> const& m); + std::shared_ptr <::google::protobuf::Message> const& m, + std::size_t size); void onMessageEnd (std::uint16_t type, diff --git a/src/ripple/overlay/impl/ProtocolMessage.h b/src/ripple/overlay/impl/ProtocolMessage.h index 0f3586022d..243af06e3c 100644 --- a/src/ripple/overlay/impl/ProtocolMessage.h +++ b/src/ripple/overlay/impl/ProtocolMessage.h @@ -78,7 +78,8 @@ invoke (int type, Buffers const& buffers, if (! m->ParseFromZeroCopyStream(&stream)) return boost::system::errc::make_error_code( boost::system::errc::invalid_argument); - auto ec = handler.onMessageBegin (type, m); + auto ec = handler.onMessageBegin (type, m, + Message::kHeaderBytes + Message::size (buffers)); if (! ec) { handler.onMessage (m); diff --git a/src/ripple/overlay/impl/TrafficCount.cpp b/src/ripple/overlay/impl/TrafficCount.cpp new file mode 100644 index 0000000000..c46d6ca5cb --- /dev/null +++ b/src/ripple/overlay/impl/TrafficCount.cpp @@ -0,0 +1,135 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include + +namespace ripple { + +const char* TrafficCount::getName (category c) +{ + switch (c) + { + case category::CT_base: + return "overhead"; + case category::CT_overlay: + return "overlay"; + case category::CT_transaction: + return "transactions"; + case category::CT_proposal: + return "proposals"; + case category::CT_validation: + return "validations"; + case category::CT_get_ledger: + return "ledger_get"; + case category::CT_share_ledger: + return "ledger_share"; + case category::CT_get_trans: + return "transaction_set_get"; + case category::CT_share_trans: + return "transaction_set_share"; + case category::CT_unknown: + assert (false); + return "unknown"; + default: + assert (false); + return "truly_unknow"; + } +} + +TrafficCount::category TrafficCount::categorize ( + ::google::protobuf::Message const& message, + int type, bool inbound) +{ + if ((type == protocol::mtHELLO) || + (type == protocol::mtPING) || + (type == protocol::mtCLUSTER) || + (type == protocol::mtSTATUS_CHANGE)) + return TrafficCount::category::CT_base; + + if ((type == protocol::mtMANIFESTS) || + (type == protocol::mtENDPOINTS) || + (type == protocol::mtPEERS) || + (type == protocol::mtGET_PEERS)) + return TrafficCount::category::CT_overlay; + + if (type == protocol::mtTRANSACTION) + return TrafficCount::category::CT_transaction; + + if (type == protocol::mtVALIDATION) + return TrafficCount::category::CT_validation; + + if (type == protocol::mtPROPOSE_LEDGER) + return TrafficCount::category::CT_proposal; + + if (type == protocol::mtHAVE_SET) + return inbound ? TrafficCount::category::CT_get_trans : + TrafficCount::category::CT_share_trans; + + { + auto msg = dynamic_cast + (&message); + if (msg) + { + // We have received ledger data + if (msg->type() == protocol::liTS_CANDIDATE) + return (inbound && !msg->has_requestcookie()) ? + TrafficCount::category::CT_get_trans : + TrafficCount::category::CT_share_trans; + return (inbound && !msg->has_requestcookie()) ? + TrafficCount::category::CT_get_ledger : + TrafficCount::category::CT_share_ledger; + } + } + + { + auto msg = + dynamic_cast + (&message); + if (msg) + { + if (msg->itype() == protocol::liTS_CANDIDATE) + return (inbound || msg->has_requestcookie()) ? + TrafficCount::category::CT_share_trans : + TrafficCount::category::CT_get_trans; + return (inbound || msg->has_requestcookie()) ? + TrafficCount::category::CT_share_ledger : + TrafficCount::category::CT_get_ledger; + } + } + + { + auto msg = + dynamic_cast + (&message); + if (msg) + { + // inbound queries and outbound responses are sharing + // outbound queries and inbound responses are getting + return (msg->query() == inbound) ? + TrafficCount::category::CT_share_ledger : + TrafficCount::category::CT_get_ledger; + } + } + + assert (false); + return TrafficCount::category::CT_unknown; +} + +} // ripple diff --git a/src/ripple/overlay/impl/TrafficCount.h b/src/ripple/overlay/impl/TrafficCount.h new file mode 100644 index 0000000000..81385888d8 --- /dev/null +++ b/src/ripple/overlay/impl/TrafficCount.h @@ -0,0 +1,124 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_OVERLAY_TRAFFIC_H_INCLUDED +#define RIPPLE_OVERLAY_TRAFFIC_H_INCLUDED + +#include "ripple.pb.h" + +#include +#include + +namespace ripple { + +class TrafficCount +{ + +public: + + using count_t = std::atomic ; + + class TrafficStats + { + public: + + count_t bytesIn; + count_t bytesOut; + count_t messagesIn; + count_t messagesOut; + + TrafficStats() : bytesIn(0), bytesOut(0), + messagesIn(0), messagesOut(0) + { ; } + + TrafficStats(const TrafficStats& ts) + : bytesIn (ts.bytesIn.load()) + , bytesOut (ts.bytesOut.load()) + , messagesIn (ts.messagesIn.load()) + , messagesOut (ts.messagesOut.load()) + { ; } + }; + + + enum class category + { + CT_base, // basic peer overhead, must be first + CT_overlay, // overlay management + CT_transaction, + CT_proposal, + CT_validation, + CT_get_ledger, // ledgers we try to get + CT_share_ledger, // ledgers we share + CT_get_trans, // transaction sets we try to get + CT_share_trans, // transaction sets we get + CT_unknown // must be last + }; + + static const char* getName (category c); + + static category categorize ( + ::google::protobuf::Message const& message, + int type, bool inbound); + + void addCount (category cat, bool inbound, int number) + { + if (inbound) + { + counts_[cat].bytesIn += number; + ++counts_[cat].messagesIn; + } + else + { + counts_[cat].bytesOut += number; + ++counts_[cat].messagesOut; + } + } + + TrafficCount() + { + for (category i = category::CT_base; + i <= category::CT_unknown; + i = static_cast(static_cast(i) + 1)) + { + counts_[i]; + } + } + + std::map + getCounts () const + { + std::map ret; + + for (auto& i : counts_) + { + ret.emplace (std::piecewise_construct, + std::forward_as_tuple (getName (i.first)), + std::forward_as_tuple (i.second)); + } + + return ret; + } + + protected: + + std::map counts_; +}; + +} +#endif diff --git a/src/ripple/protocol/JsonFields.h b/src/ripple/protocol/JsonFields.h index d77f0b7909..90114b04ec 100644 --- a/src/ripple/protocol/JsonFields.h +++ b/src/ripple/protocol/JsonFields.h @@ -275,7 +275,7 @@ JSS ( paths_computed ); // out: PathRequest, RipplePathFind JSS ( peer ); // in: AccountLines JSS ( peer_authorized ); // out: AccountLines JSS ( peer_id ); // out: LedgerProposal -JSS ( peers ); // out: InboundLedger, handlers/Peers +JSS ( peers ); // out: InboundLedger, handlers/Peers, Overlay JSS ( port ); // in: Connect JSS ( previous_ledger ); // out: LedgerPropose JSS ( proof ); // in: BookOffers @@ -344,6 +344,7 @@ JSS ( taker_pays ); // in: Subscribe, Unsubscribe, BookOffers JSS ( taker_pays_funded ); // out: NetworkOPs JSS ( threshold ); // in: Blacklist JSS ( timeouts ); // out: InboundLedger +JSS ( traffic ); // out: Overlay JSS ( totalCoins ); // out: LedgerToJson JSS ( total_coins ); // out: LedgerToJson JSS ( transTreeHash ); // out: ledger/Ledger.cpp diff --git a/src/ripple/unity/overlay.cpp b/src/ripple/unity/overlay.cpp index 24bb9c45ed..877199052d 100644 --- a/src/ripple/unity/overlay.cpp +++ b/src/ripple/unity/overlay.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #include