diff --git a/src/ripple/algorithm/api/DecayingSample.h b/src/ripple/algorithm/api/DecayingSample.h index 85c848a292..28168de9f1 100644 --- a/src/ripple/algorithm/api/DecayingSample.h +++ b/src/ripple/algorithm/api/DecayingSample.h @@ -23,18 +23,19 @@ namespace ripple { /** Sampling function using exponential decay to provide a continuous value. */ -template +template class DecayingSample { public: - typedef Value value_type; + typedef typename Clock::duration::rep value_type; typedef typename Clock::time_point time_point; - // No default constructed DecayingSamples allowed DecayingSample () = delete; - /** Create a default constructed sample. */ - DecayingSample (time_point now) + /** + @param now Start time of DecayingSample. + */ + explicit DecayingSample (time_point now) : m_value (value_type()) , m_when (now) { @@ -43,7 +44,7 @@ public: /** Add a new sample. The value is first aged according to the specified time. */ - Value add (value_type value, time_point now) + value_type add (value_type value, time_point now) { decay (now); m_value += value; @@ -53,7 +54,7 @@ public: /** Retrieve the current value in normalized units. The samples are first aged according to the specified time. */ - Value value (time_point now) + value_type value (time_point now) { decay (now); return m_value / Window; @@ -75,7 +76,9 @@ private: // typename Clock::duration window (Window); if (n > 4 * window) + { m_value = value_type(); + } else { value_type const tick_value = 1; diff --git a/src/ripple/resource/README.md b/src/ripple/resource/README.md index 2734c54486..fa8370b615 100644 --- a/src/ripple/resource/README.md +++ b/src/ripple/resource/README.md @@ -33,39 +33,45 @@ Although RPC connections consume resources, they are transient and cannot be rate limited. It is advised not to expose RPC interfaces to the general public. +## Consumer Types ## + +Consumers are placed into three classifications (as identified by the +Resource::Kind enumeration): + + - InBound, + - OutBound, and + - Admin + + Each caller determines for itself the classification of the Consumer it is + creating. + ## Resource Loading ## -It is expected that when a client first connects to a server it will -impose a higher load on the server. The client may need to catch up -on transactions they've missed. The client may need to get trust lines -or transfer fees. The Manager must expect this initial peak load, but -not allow that high load to continue because, over the long term, that -would unduly stress the server. +It is expected that a client will impose a higher load on the server +when it first connects: the client may need to catch up on transactions +it has missed, or get trust lines, or transfer fees. The Manager must +expect this initial peak load, but not allow that high load to continue +because over the long term that would unduly stress the server. If a client places a sustained high load on the server, that client -is initially given a warning message. If the high load continues +is initially given a warning message. If that high load continues the Manager may tell the heavily loaded server to drop the connection entirely and not allow re-connection for some amount of time. -Each load is monitored using a "peaking" scheme implemented using the -DecayingSample class. DecayingSample captures peaks and then decays -those peak values over time. +Each load is monitored by capturing peaks and then decaying those peak +values over time: this is implemented by the DecayingSample class. ## Gossip ## Each server in a cluster creates a list of IP addresses of end points -that are imposing a significant load. The server passes that list to -other servers in the cluster. Those lists are called "Gossip". They -allow the individual servers in the cluster to potentially identify a -set of IP addresses that are unduly loading the entire cluster. Again -the recourse of the individual servers is to drop connections to those -IP addresses that occur commonly in the gossip. -identify +that are imposing a significant load. This list is called Gossip, which +is passed to other nodes in that cluster. Gossip helps individual +servers in the cluster identify IP addreses that might be unduly loading +the entire cluster. Again the recourse of the individual servers is to +drop connections to those IP addresses that occur commonly in the gossip. ## Access ## -Although the Resource::Manager does nothing to enforce this, in -rippled there is a single instance of the Resource::Manager. That -Resource::Manager is held by the Application. Entities that wish -to use the shared Resource::Manager can access it by calling -getResourceManager() on the Application. +In rippled, the Application holds a unique instance of Resource::Manager, +which may be retrieved by calling the method +`Application::getResourceManager()`. diff --git a/src/ripple/resource/impl/Consumer.cpp b/src/ripple/resource/impl/Consumer.cpp index 0a44d1e16d..66c222e9fd 100644 --- a/src/ripple/resource/impl/Consumer.cpp +++ b/src/ripple/resource/impl/Consumer.cpp @@ -36,43 +36,31 @@ Consumer::Consumer (Consumer const& other) : m_logic (other.m_logic) , m_entry (nullptr) { - if (m_logic != nullptr) + if (m_logic && other.m_entry) { - if (other.m_entry != nullptr) - { - m_entry = other.m_entry; - m_logic->acquire (*m_entry); - } + m_entry = other.m_entry; + m_logic->acquire (*m_entry); } } Consumer::~Consumer() { - if (m_logic != nullptr) - { - if (m_entry != nullptr) - m_logic->release (*m_entry); - } + if (m_logic && m_entry) + m_logic->release (*m_entry); } Consumer& Consumer::operator= (Consumer const& other) { // remove old ref - if (m_logic != nullptr) - { - if (m_entry != nullptr) - m_logic->release (*m_entry); - } + if (m_logic && m_entry) + m_logic->release (*m_entry); m_logic = other.m_logic; m_entry = other.m_entry; - + // add new ref - if (m_logic != nullptr) - { - if (m_entry != nullptr) - m_logic->acquire (*m_entry); - } + if (m_logic && m_entry) + m_logic->acquire (*m_entry); return *this; } @@ -87,7 +75,7 @@ std::string Consumer::to_string () const bool Consumer::admin () const { - if (m_entry != nullptr) + if (m_entry) return m_entry->admin(); return false; @@ -101,7 +89,11 @@ void Consumer::elevate (std::string const& name) Disposition Consumer::disposition() const { - return ok; + Disposition d = ok; + if (m_logic && m_entry) + d = m_logic->charge(*m_entry, Charge(0)); + + return d; } Disposition Consumer::charge (Charge const& what) diff --git a/src/ripple/resource/impl/Entry.h b/src/ripple/resource/impl/Entry.h index e8e14073a6..f0c7def2a7 100644 --- a/src/ripple/resource/impl/Entry.h +++ b/src/ripple/resource/impl/Entry.h @@ -28,15 +28,15 @@ typedef beast::abstract_clock clock_type; // An entry in the table struct Entry : public beast::List ::Node { - // No default constructor Entry () = delete; - // Each Entry needs to know what time it is constructed + /** + @param now Construction time of Entry. + */ explicit Entry(clock_type::time_point const now) : refcount (0) , local_balance (now) , remote_balance (0) - , disposition (ok) , lastWarningTime (0) , whenExpires (0) { @@ -87,9 +87,6 @@ struct Entry : public beast::List ::Node // Normalized balance contribution from imports int remote_balance; - // Disposition - Disposition disposition; - // Time of the last warning clock_type::rep lastWarningTime; diff --git a/src/ripple/resource/impl/Key.h b/src/ripple/resource/impl/Key.h index 56c15c017c..e37d146689 100644 --- a/src/ripple/resource/impl/Key.h +++ b/src/ripple/resource/impl/Key.h @@ -30,10 +30,9 @@ struct Key beast::IP::Endpoint address; std::string name; - // No default constructor Key () = delete; - // Convenience constructors + // Constructor for Inbound and Outbound (non-Admin) keys Key (Kind k, beast::IP::Endpoint const& addr) : kind(k) , address(addr) @@ -42,6 +41,7 @@ struct Key assert(kind != kindAdmin); } + // Constructor for Admin keys Key (Kind k, const std::string& n) : kind(k) , address() diff --git a/src/ripple/resource/impl/Logic.h b/src/ripple/resource/impl/Logic.h index 6e09a653bd..306852039a 100644 --- a/src/ripple/resource/impl/Logic.h +++ b/src/ripple/resource/impl/Logic.h @@ -113,16 +113,19 @@ public: SharedState::Access state (m_state); std::pair result ( state->table.emplace (std::piecewise_construct, - std::make_tuple(kindInbound, address.at_port (0)), // Key - std::make_tuple(m_clock.now()))); // Entry + std::make_tuple (kindInbound, address.at_port (0)), // Key + std::make_tuple (m_clock.now()))); // Entry + entry = &result.first->second; entry->key = &result.first->first; ++entry->refcount; if (entry->refcount == 1) { if (! result.second) + { state->inactive.erase ( state->inactive.iterator_to (*entry)); + } state->inbound.push_back (*entry); } } @@ -144,8 +147,9 @@ public: SharedState::Access state (m_state); std::pair result ( state->table.emplace (std::piecewise_construct, - std::make_tuple(kindOutbound, address), // Key - std::make_tuple(m_clock.now()))); // Entry + std::make_tuple (kindOutbound, address), // Key + std::make_tuple (m_clock.now()))); // Entry + entry = &result.first->second; entry->key = &result.first->first; ++entry->refcount; @@ -172,8 +176,9 @@ public: SharedState::Access state (m_state); std::pair result ( state->table.emplace (std::piecewise_construct, - std::make_tuple(kindAdmin, name), - std::make_tuple(m_clock.now()))); + std::make_tuple (kindAdmin, name), // Key + std::make_tuple (m_clock.now()))); // Entry + entry = &result.first->second; entry->key = &result.first->first; ++entry->refcount; @@ -203,8 +208,9 @@ public: SharedState::Access state (m_state); std::pair result ( state->table.emplace (std::piecewise_construct, - std::make_tuple(kindAdmin, name), - std::make_tuple(m_clock.now()))); + std::make_tuple (kindAdmin, name), // Key + std::make_tuple (m_clock.now()))); // Entry + entry = &result.first->second; entry->key = &result.first->first; ++entry->refcount; @@ -234,41 +240,38 @@ public: Json::Value ret (Json::objectValue); SharedState::Access state (m_state); - for (EntryIntrusiveList::iterator iter (state->inbound.begin()); - iter != state->inbound.end(); ++iter) + for (auto& inboundEntry : state->inbound) { - int localBalance = iter->local_balance.value (now); - if ((localBalance + iter->remote_balance) >= threshold) + int localBalance = inboundEntry.local_balance.value (now); + if ((localBalance + inboundEntry.remote_balance) >= threshold) { - Json::Value& entry = (ret[iter->to_string()] = Json::objectValue); + Json::Value& entry = (ret[inboundEntry.to_string()] = Json::objectValue); entry["local"] = localBalance; - entry["remote"] = iter->remote_balance; + entry["remote"] = inboundEntry.remote_balance; entry["type"] = "outbound"; } } - for (EntryIntrusiveList::iterator iter (state->outbound.begin()); - iter != state->outbound.end(); ++iter) + for (auto& outboundEntry : state->outbound) { - int localBalance = iter->local_balance.value (now); - if ((localBalance + iter->remote_balance) >= threshold) + int localBalance = outboundEntry.local_balance.value (now); + if ((localBalance + outboundEntry.remote_balance) >= threshold) { - Json::Value& entry = (ret[iter->to_string()] = Json::objectValue); + Json::Value& entry = (ret[outboundEntry.to_string()] = Json::objectValue); entry["local"] = localBalance; - entry["remote"] = iter->remote_balance; + entry["remote"] = outboundEntry.remote_balance; entry["type"] = "outbound"; } } - for (EntryIntrusiveList::iterator iter (state->admin.begin()); - iter != state->admin.end(); ++iter) + for (auto& adminEntry : state->admin) { - int localBalance = iter->local_balance.value (now); - if ((localBalance + iter->remote_balance) >= threshold) + int localBalance = adminEntry.local_balance.value (now); + if ((localBalance + adminEntry.remote_balance) >= threshold) { - Json::Value& entry = (ret[iter->to_string()] = Json::objectValue); + Json::Value& entry = (ret[adminEntry.to_string()] = Json::objectValue); entry["local"] = localBalance; - entry["remote"] = iter->remote_balance; + entry["remote"] = adminEntry.remote_balance; entry["type"] = "admin"; } @@ -286,14 +289,13 @@ public: gossip.items.reserve (state->inbound.size()); - for (EntryIntrusiveList::iterator iter (state->inbound.begin()); - iter != state->inbound.end(); ++iter) + for (auto& inboundEntry : state->inbound) { Gossip::Item item; - item.balance = iter->local_balance.value (now); + item.balance = inboundEntry.local_balance.value (now); if (item.balance >= minimumGossipBalance) { - item.address = iter->key->address; + item.address = inboundEntry.key->address; gossip.items.push_back (item); } } @@ -311,7 +313,7 @@ public: std::pair result ( state->import_table.emplace (std::piecewise_construct, std::make_tuple(origin), // Key - std::make_tuple(m_clock.elapsed()))); // Import + std::make_tuple(m_clock.elapsed()))); // Import if (result.second) { @@ -319,12 +321,12 @@ public: Import& next (result.first->second); next.whenExpires = elapsed + gossipExpirationSeconds; next.items.reserve (gossip.items.size()); - for (std::vector ::const_iterator iter (gossip.items.begin()); - iter != gossip.items.end(); ++iter) + + for (auto const& gossipItem : gossip.items) { Import::Item item; - item.balance = iter->balance; - item.consumer = newInboundEndpoint (iter->address); + item.balance = gossipItem.balance; + item.consumer = newInboundEndpoint (gossipItem.address); item.consumer.entry().remote_balance += item.balance; next.items.push_back (item); } @@ -337,21 +339,19 @@ public: Import next; next.whenExpires = elapsed + gossipExpirationSeconds; next.items.reserve (gossip.items.size()); - for (std::vector ::const_iterator iter (gossip.items.begin()); - iter != gossip.items.end(); ++iter) + for (auto const& gossipItem : gossip.items) { Import::Item item; - item.balance = iter->balance; - item.consumer = newInboundEndpoint (iter->address); + item.balance = gossipItem.balance; + item.consumer = newInboundEndpoint (gossipItem.address); item.consumer.entry().remote_balance += item.balance; next.items.push_back (item); } Import& prev (result.first->second); - for (std::vector ::iterator iter (prev.items.begin()); - iter != prev.items.end(); ++iter) + for (auto& item : prev.items) { - iter->consumer.entry().remote_balance -= iter->balance; + item.consumer.entry().remote_balance -= item.balance; } std::swap (next, prev); @@ -377,8 +377,7 @@ public: clock_type::rep const elapsed (m_clock.elapsed()); - for (EntryIntrusiveList::iterator iter ( - state->inactive.begin()); iter != state->inactive.end();) + for (auto iter (state->inactive.begin()); iter != state->inactive.end();) { if (iter->whenExpires <= elapsed) { @@ -400,7 +399,7 @@ public: Import& import (iter->second); if (iter->second.whenExpires <= elapsed) { - for (std::vector ::iterator item_iter (import.items.begin()); + for (auto item_iter (import.items.begin()); item_iter != import.items.end(); ++item_iter) { item_iter->consumer.entry().remote_balance -= item_iter->balance; @@ -506,13 +505,21 @@ public: { bool drop (false); clock_type::time_point const now (m_clock.now()); - if (entry.balance (now) >= dropThreshold) + int const balance (entry.balance (now)); + if (balance >= dropThreshold) { + m_journal.warning << + "Consumer entry " << entry << + " dropped with balance " << balance << + " at or above drop threshold " << dropThreshold; + + // Adding feeDrop at this point keeps the dropped connection + // from re-connecting for at least a little while after it is + // dropped. charge (entry, feeDrop, state); + ++m_stats.drop; drop = true; } - if (drop) - ++m_stats.drop; return drop; } @@ -572,16 +579,15 @@ public: beast::PropertyStream::Set& items, EntryIntrusiveList& list) { - for (EntryIntrusiveList::iterator iter (list.begin()); - iter != list.end(); ++iter) + for (auto& entry : list) { beast::PropertyStream::Map item (items); - if (iter->refcount != 0) - item ["count"] = iter->refcount; - item ["name"] = iter->to_string(); - item ["balance"] = iter->balance(now); - if (iter->remote_balance != 0) - item ["remote_balance"] = iter->remote_balance; + if (entry.refcount != 0) + item ["count"] = entry.refcount; + item ["name"] = entry.to_string(); + item ["balance"] = entry.balance(now); + if (entry.remote_balance != 0) + item ["remote_balance"] = entry.remote_balance; } } diff --git a/src/ripple/resource/impl/Tests.cpp b/src/ripple/resource/impl/Tests.cpp index 119db369dd..2a11165306 100644 --- a/src/ripple/resource/impl/Tests.cpp +++ b/src/ripple/resource/impl/Tests.cpp @@ -87,12 +87,13 @@ public: Charge const fee (dropThreshold + 1); beast::IP::Endpoint const addr ( beast::IP::Endpoint::from_string ("207.127.82.2")); - + { Consumer c (logic.newInboundEndpoint (addr)); // Create load until we get a warning - for (std::size_t n (maxLoopCount); true; --n) + std::size_t n (maxLoopCount); + while (--n > 0) { if (n == 0) { @@ -109,7 +110,7 @@ public: } // Create load until we get dropped - for (std::size_t n (maxLoopCount); true; --n) + while (--n > 0) { if (n == 0) { @@ -119,34 +120,49 @@ public: if (c.charge (fee) == drop) { - pass (); + // Disconnect abusive Consumer + expect (c.disconnect ()); break; } ++logic.clock (); } - } + // Make sure the consumer is on the blacklist for a while. { Consumer c (logic.newInboundEndpoint (addr)); - expect (c.disconnect ()); - } - - for (std::size_t n (maxLoopCount); true; --n) - { - Consumer c (logic.newInboundEndpoint (addr)); - if (n == 0) + logic.periodicActivity(); + if (c.disposition () != drop) { - fail ("Loop count exceeded without expiring black list"); + fail ("Dropped consumer not put on blacklist"); return; } + } - if (c.disposition() != drop) + // Makes sure the Consumer is eventually removed from blacklist + bool readmitted = false; + { + // Give Consumer time to become readmitted. Should never + // exceed expiration time. + std::size_t n (secondsUntilExpiration + 1); + while (--n > 0) { - pass (); - break; + ++logic.clock (); + logic.periodicActivity(); + Consumer c (logic.newInboundEndpoint (addr)); + if (c.disposition () != drop) + { + readmitted = true; + break; + } } } + if (readmitted == false) + { + fail ("Dropped Consumer left on blacklist too long"); + return; + } + pass(); } void testImports (beast::Journal j)