Tidy up Resource::Manager (RIPD-362):

* Style improvements
* More documentation
This commit is contained in:
Scott Schurr
2014-07-10 16:30:24 -07:00
committed by Vinnie Falco
parent 28898031f0
commit 5869902f2c
7 changed files with 153 additions and 133 deletions

View File

@@ -23,18 +23,19 @@
namespace ripple { namespace ripple {
/** Sampling function using exponential decay to provide a continuous value. */ /** Sampling function using exponential decay to provide a continuous value. */
template <int Window, typename Clock, typename Value = int> template <int Window, typename Clock>
class DecayingSample class DecayingSample
{ {
public: public:
typedef Value value_type; typedef typename Clock::duration::rep value_type;
typedef typename Clock::time_point time_point; typedef typename Clock::time_point time_point;
// No default constructed DecayingSamples allowed
DecayingSample () = delete; DecayingSample () = delete;
/** Create a default constructed sample. */ /**
DecayingSample (time_point now) @param now Start time of DecayingSample.
*/
explicit DecayingSample (time_point now)
: m_value (value_type()) : m_value (value_type())
, m_when (now) , m_when (now)
{ {
@@ -43,7 +44,7 @@ public:
/** Add a new sample. /** Add a new sample.
The value is first aged according to the specified time. The value is first aged according to the specified time.
*/ */
Value add (value_type value, time_point now) value_type add (value_type value, time_point now)
{ {
decay (now); decay (now);
m_value += value; m_value += value;
@@ -53,7 +54,7 @@ public:
/** Retrieve the current value in normalized units. /** Retrieve the current value in normalized units.
The samples are first aged according to the specified time. The samples are first aged according to the specified time.
*/ */
Value value (time_point now) value_type value (time_point now)
{ {
decay (now); decay (now);
return m_value / Window; return m_value / Window;
@@ -75,7 +76,9 @@ private:
// //
typename Clock::duration window (Window); typename Clock::duration window (Window);
if (n > 4 * window) if (n > 4 * window)
{
m_value = value_type(); m_value = value_type();
}
else else
{ {
value_type const tick_value = 1; value_type const tick_value = 1;

View File

@@ -33,39 +33,45 @@ Although RPC connections consume resources, they are transient and
cannot be rate limited. It is advised not to expose RPC interfaces cannot be rate limited. It is advised not to expose RPC interfaces
to the general public. to the general public.
## Consumer Types ##
Consumers are placed into three classifications (as identified by the
Resource::Kind enumeration):
- InBound,
- OutBound, and
- Admin
Each caller determines for itself the classification of the Consumer it is
creating.
## Resource Loading ## ## Resource Loading ##
It is expected that when a client first connects to a server it will It is expected that a client will impose a higher load on the server
impose a higher load on the server. The client may need to catch up when it first connects: the client may need to catch up on transactions
on transactions they've missed. The client may need to get trust lines it has missed, or get trust lines, or transfer fees. The Manager must
or transfer fees. The Manager must expect this initial peak load, but expect this initial peak load, but not allow that high load to continue
not allow that high load to continue because, over the long term, that because over the long term that would unduly stress the server.
would unduly stress the server.
If a client places a sustained high load on the server, that client If a client places a sustained high load on the server, that client
is initially given a warning message. If the high load continues is initially given a warning message. If that high load continues
the Manager may tell the heavily loaded server to drop the connection the Manager may tell the heavily loaded server to drop the connection
entirely and not allow re-connection for some amount of time. entirely and not allow re-connection for some amount of time.
Each load is monitored using a "peaking" scheme implemented using the Each load is monitored by capturing peaks and then decaying those peak
DecayingSample class. DecayingSample captures peaks and then decays values over time: this is implemented by the DecayingSample class.
those peak values over time.
## Gossip ## ## Gossip ##
Each server in a cluster creates a list of IP addresses of end points Each server in a cluster creates a list of IP addresses of end points
that are imposing a significant load. The server passes that list to that are imposing a significant load. This list is called Gossip, which
other servers in the cluster. Those lists are called "Gossip". They is passed to other nodes in that cluster. Gossip helps individual
allow the individual servers in the cluster to potentially identify a servers in the cluster identify IP addreses that might be unduly loading
set of IP addresses that are unduly loading the entire cluster. Again the entire cluster. Again the recourse of the individual servers is to
the recourse of the individual servers is to drop connections to those drop connections to those IP addresses that occur commonly in the gossip.
IP addresses that occur commonly in the gossip.
identify
## Access ## ## Access ##
Although the Resource::Manager does nothing to enforce this, in In rippled, the Application holds a unique instance of Resource::Manager,
rippled there is a single instance of the Resource::Manager. That which may be retrieved by calling the method
Resource::Manager is held by the Application. Entities that wish `Application::getResourceManager()`.
to use the shared Resource::Manager can access it by calling
getResourceManager() on the Application.

View File

@@ -36,43 +36,31 @@ Consumer::Consumer (Consumer const& other)
: m_logic (other.m_logic) : m_logic (other.m_logic)
, m_entry (nullptr) , m_entry (nullptr)
{ {
if (m_logic != nullptr) if (m_logic && other.m_entry)
{ {
if (other.m_entry != nullptr) m_entry = other.m_entry;
{ m_logic->acquire (*m_entry);
m_entry = other.m_entry;
m_logic->acquire (*m_entry);
}
} }
} }
Consumer::~Consumer() Consumer::~Consumer()
{ {
if (m_logic != nullptr) if (m_logic && m_entry)
{ m_logic->release (*m_entry);
if (m_entry != nullptr)
m_logic->release (*m_entry);
}
} }
Consumer& Consumer::operator= (Consumer const& other) Consumer& Consumer::operator= (Consumer const& other)
{ {
// remove old ref // remove old ref
if (m_logic != nullptr) if (m_logic && m_entry)
{ m_logic->release (*m_entry);
if (m_entry != nullptr)
m_logic->release (*m_entry);
}
m_logic = other.m_logic; m_logic = other.m_logic;
m_entry = other.m_entry; m_entry = other.m_entry;
// add new ref // add new ref
if (m_logic != nullptr) if (m_logic && m_entry)
{ m_logic->acquire (*m_entry);
if (m_entry != nullptr)
m_logic->acquire (*m_entry);
}
return *this; return *this;
} }
@@ -87,7 +75,7 @@ std::string Consumer::to_string () const
bool Consumer::admin () const bool Consumer::admin () const
{ {
if (m_entry != nullptr) if (m_entry)
return m_entry->admin(); return m_entry->admin();
return false; return false;
@@ -101,7 +89,11 @@ void Consumer::elevate (std::string const& name)
Disposition Consumer::disposition() const Disposition Consumer::disposition() const
{ {
return ok; Disposition d = ok;
if (m_logic && m_entry)
d = m_logic->charge(*m_entry, Charge(0));
return d;
} }
Disposition Consumer::charge (Charge const& what) Disposition Consumer::charge (Charge const& what)

View File

@@ -28,15 +28,15 @@ typedef beast::abstract_clock <std::chrono::seconds> clock_type;
// An entry in the table // An entry in the table
struct Entry : public beast::List <Entry>::Node struct Entry : public beast::List <Entry>::Node
{ {
// No default constructor
Entry () = delete; Entry () = delete;
// Each Entry needs to know what time it is constructed /**
@param now Construction time of Entry.
*/
explicit Entry(clock_type::time_point const now) explicit Entry(clock_type::time_point const now)
: refcount (0) : refcount (0)
, local_balance (now) , local_balance (now)
, remote_balance (0) , remote_balance (0)
, disposition (ok)
, lastWarningTime (0) , lastWarningTime (0)
, whenExpires (0) , whenExpires (0)
{ {
@@ -87,9 +87,6 @@ struct Entry : public beast::List <Entry>::Node
// Normalized balance contribution from imports // Normalized balance contribution from imports
int remote_balance; int remote_balance;
// Disposition
Disposition disposition;
// Time of the last warning // Time of the last warning
clock_type::rep lastWarningTime; clock_type::rep lastWarningTime;

View File

@@ -30,10 +30,9 @@ struct Key
beast::IP::Endpoint address; beast::IP::Endpoint address;
std::string name; std::string name;
// No default constructor
Key () = delete; Key () = delete;
// Convenience constructors // Constructor for Inbound and Outbound (non-Admin) keys
Key (Kind k, beast::IP::Endpoint const& addr) Key (Kind k, beast::IP::Endpoint const& addr)
: kind(k) : kind(k)
, address(addr) , address(addr)
@@ -42,6 +41,7 @@ struct Key
assert(kind != kindAdmin); assert(kind != kindAdmin);
} }
// Constructor for Admin keys
Key (Kind k, const std::string& n) Key (Kind k, const std::string& n)
: kind(k) : kind(k)
, address() , address()

View File

@@ -113,16 +113,19 @@ public:
SharedState::Access state (m_state); SharedState::Access state (m_state);
std::pair <Table::iterator, bool> result ( std::pair <Table::iterator, bool> result (
state->table.emplace (std::piecewise_construct, state->table.emplace (std::piecewise_construct,
std::make_tuple(kindInbound, address.at_port (0)), // Key std::make_tuple (kindInbound, address.at_port (0)), // Key
std::make_tuple(m_clock.now()))); // Entry std::make_tuple (m_clock.now()))); // Entry
entry = &result.first->second; entry = &result.first->second;
entry->key = &result.first->first; entry->key = &result.first->first;
++entry->refcount; ++entry->refcount;
if (entry->refcount == 1) if (entry->refcount == 1)
{ {
if (! result.second) if (! result.second)
{
state->inactive.erase ( state->inactive.erase (
state->inactive.iterator_to (*entry)); state->inactive.iterator_to (*entry));
}
state->inbound.push_back (*entry); state->inbound.push_back (*entry);
} }
} }
@@ -144,8 +147,9 @@ public:
SharedState::Access state (m_state); SharedState::Access state (m_state);
std::pair <Table::iterator, bool> result ( std::pair <Table::iterator, bool> result (
state->table.emplace (std::piecewise_construct, state->table.emplace (std::piecewise_construct,
std::make_tuple(kindOutbound, address), // Key std::make_tuple (kindOutbound, address), // Key
std::make_tuple(m_clock.now()))); // Entry std::make_tuple (m_clock.now()))); // Entry
entry = &result.first->second; entry = &result.first->second;
entry->key = &result.first->first; entry->key = &result.first->first;
++entry->refcount; ++entry->refcount;
@@ -172,8 +176,9 @@ public:
SharedState::Access state (m_state); SharedState::Access state (m_state);
std::pair <Table::iterator, bool> result ( std::pair <Table::iterator, bool> result (
state->table.emplace (std::piecewise_construct, state->table.emplace (std::piecewise_construct,
std::make_tuple(kindAdmin, name), std::make_tuple (kindAdmin, name), // Key
std::make_tuple(m_clock.now()))); std::make_tuple (m_clock.now()))); // Entry
entry = &result.first->second; entry = &result.first->second;
entry->key = &result.first->first; entry->key = &result.first->first;
++entry->refcount; ++entry->refcount;
@@ -203,8 +208,9 @@ public:
SharedState::Access state (m_state); SharedState::Access state (m_state);
std::pair <Table::iterator, bool> result ( std::pair <Table::iterator, bool> result (
state->table.emplace (std::piecewise_construct, state->table.emplace (std::piecewise_construct,
std::make_tuple(kindAdmin, name), std::make_tuple (kindAdmin, name), // Key
std::make_tuple(m_clock.now()))); std::make_tuple (m_clock.now()))); // Entry
entry = &result.first->second; entry = &result.first->second;
entry->key = &result.first->first; entry->key = &result.first->first;
++entry->refcount; ++entry->refcount;
@@ -234,41 +240,38 @@ public:
Json::Value ret (Json::objectValue); Json::Value ret (Json::objectValue);
SharedState::Access state (m_state); SharedState::Access state (m_state);
for (EntryIntrusiveList::iterator iter (state->inbound.begin()); for (auto& inboundEntry : state->inbound)
iter != state->inbound.end(); ++iter)
{ {
int localBalance = iter->local_balance.value (now); int localBalance = inboundEntry.local_balance.value (now);
if ((localBalance + iter->remote_balance) >= threshold) if ((localBalance + inboundEntry.remote_balance) >= threshold)
{ {
Json::Value& entry = (ret[iter->to_string()] = Json::objectValue); Json::Value& entry = (ret[inboundEntry.to_string()] = Json::objectValue);
entry["local"] = localBalance; entry["local"] = localBalance;
entry["remote"] = iter->remote_balance; entry["remote"] = inboundEntry.remote_balance;
entry["type"] = "outbound"; entry["type"] = "outbound";
} }
} }
for (EntryIntrusiveList::iterator iter (state->outbound.begin()); for (auto& outboundEntry : state->outbound)
iter != state->outbound.end(); ++iter)
{ {
int localBalance = iter->local_balance.value (now); int localBalance = outboundEntry.local_balance.value (now);
if ((localBalance + iter->remote_balance) >= threshold) if ((localBalance + outboundEntry.remote_balance) >= threshold)
{ {
Json::Value& entry = (ret[iter->to_string()] = Json::objectValue); Json::Value& entry = (ret[outboundEntry.to_string()] = Json::objectValue);
entry["local"] = localBalance; entry["local"] = localBalance;
entry["remote"] = iter->remote_balance; entry["remote"] = outboundEntry.remote_balance;
entry["type"] = "outbound"; entry["type"] = "outbound";
} }
} }
for (EntryIntrusiveList::iterator iter (state->admin.begin()); for (auto& adminEntry : state->admin)
iter != state->admin.end(); ++iter)
{ {
int localBalance = iter->local_balance.value (now); int localBalance = adminEntry.local_balance.value (now);
if ((localBalance + iter->remote_balance) >= threshold) if ((localBalance + adminEntry.remote_balance) >= threshold)
{ {
Json::Value& entry = (ret[iter->to_string()] = Json::objectValue); Json::Value& entry = (ret[adminEntry.to_string()] = Json::objectValue);
entry["local"] = localBalance; entry["local"] = localBalance;
entry["remote"] = iter->remote_balance; entry["remote"] = adminEntry.remote_balance;
entry["type"] = "admin"; entry["type"] = "admin";
} }
@@ -286,14 +289,13 @@ public:
gossip.items.reserve (state->inbound.size()); gossip.items.reserve (state->inbound.size());
for (EntryIntrusiveList::iterator iter (state->inbound.begin()); for (auto& inboundEntry : state->inbound)
iter != state->inbound.end(); ++iter)
{ {
Gossip::Item item; Gossip::Item item;
item.balance = iter->local_balance.value (now); item.balance = inboundEntry.local_balance.value (now);
if (item.balance >= minimumGossipBalance) if (item.balance >= minimumGossipBalance)
{ {
item.address = iter->key->address; item.address = inboundEntry.key->address;
gossip.items.push_back (item); gossip.items.push_back (item);
} }
} }
@@ -311,7 +313,7 @@ public:
std::pair <Imports::iterator, bool> result ( std::pair <Imports::iterator, bool> result (
state->import_table.emplace (std::piecewise_construct, state->import_table.emplace (std::piecewise_construct,
std::make_tuple(origin), // Key std::make_tuple(origin), // Key
std::make_tuple(m_clock.elapsed()))); // Import std::make_tuple(m_clock.elapsed()))); // Import
if (result.second) if (result.second)
{ {
@@ -319,12 +321,12 @@ public:
Import& next (result.first->second); Import& next (result.first->second);
next.whenExpires = elapsed + gossipExpirationSeconds; next.whenExpires = elapsed + gossipExpirationSeconds;
next.items.reserve (gossip.items.size()); next.items.reserve (gossip.items.size());
for (std::vector <Gossip::Item>::const_iterator iter (gossip.items.begin());
iter != gossip.items.end(); ++iter) for (auto const& gossipItem : gossip.items)
{ {
Import::Item item; Import::Item item;
item.balance = iter->balance; item.balance = gossipItem.balance;
item.consumer = newInboundEndpoint (iter->address); item.consumer = newInboundEndpoint (gossipItem.address);
item.consumer.entry().remote_balance += item.balance; item.consumer.entry().remote_balance += item.balance;
next.items.push_back (item); next.items.push_back (item);
} }
@@ -337,21 +339,19 @@ public:
Import next; Import next;
next.whenExpires = elapsed + gossipExpirationSeconds; next.whenExpires = elapsed + gossipExpirationSeconds;
next.items.reserve (gossip.items.size()); next.items.reserve (gossip.items.size());
for (std::vector <Gossip::Item>::const_iterator iter (gossip.items.begin()); for (auto const& gossipItem : gossip.items)
iter != gossip.items.end(); ++iter)
{ {
Import::Item item; Import::Item item;
item.balance = iter->balance; item.balance = gossipItem.balance;
item.consumer = newInboundEndpoint (iter->address); item.consumer = newInboundEndpoint (gossipItem.address);
item.consumer.entry().remote_balance += item.balance; item.consumer.entry().remote_balance += item.balance;
next.items.push_back (item); next.items.push_back (item);
} }
Import& prev (result.first->second); Import& prev (result.first->second);
for (std::vector <Import::Item>::iterator iter (prev.items.begin()); for (auto& item : prev.items)
iter != prev.items.end(); ++iter)
{ {
iter->consumer.entry().remote_balance -= iter->balance; item.consumer.entry().remote_balance -= item.balance;
} }
std::swap (next, prev); std::swap (next, prev);
@@ -377,8 +377,7 @@ public:
clock_type::rep const elapsed (m_clock.elapsed()); clock_type::rep const elapsed (m_clock.elapsed());
for (EntryIntrusiveList::iterator iter ( for (auto iter (state->inactive.begin()); iter != state->inactive.end();)
state->inactive.begin()); iter != state->inactive.end();)
{ {
if (iter->whenExpires <= elapsed) if (iter->whenExpires <= elapsed)
{ {
@@ -400,7 +399,7 @@ public:
Import& import (iter->second); Import& import (iter->second);
if (iter->second.whenExpires <= elapsed) if (iter->second.whenExpires <= elapsed)
{ {
for (std::vector <Import::Item>::iterator item_iter (import.items.begin()); for (auto item_iter (import.items.begin());
item_iter != import.items.end(); ++item_iter) item_iter != import.items.end(); ++item_iter)
{ {
item_iter->consumer.entry().remote_balance -= item_iter->balance; item_iter->consumer.entry().remote_balance -= item_iter->balance;
@@ -506,13 +505,21 @@ public:
{ {
bool drop (false); bool drop (false);
clock_type::time_point const now (m_clock.now()); clock_type::time_point const now (m_clock.now());
if (entry.balance (now) >= dropThreshold) int const balance (entry.balance (now));
if (balance >= dropThreshold)
{ {
m_journal.warning <<
"Consumer entry " << entry <<
" dropped with balance " << balance <<
" at or above drop threshold " << dropThreshold;
// Adding feeDrop at this point keeps the dropped connection
// from re-connecting for at least a little while after it is
// dropped.
charge (entry, feeDrop, state); charge (entry, feeDrop, state);
++m_stats.drop;
drop = true; drop = true;
} }
if (drop)
++m_stats.drop;
return drop; return drop;
} }
@@ -572,16 +579,15 @@ public:
beast::PropertyStream::Set& items, beast::PropertyStream::Set& items,
EntryIntrusiveList& list) EntryIntrusiveList& list)
{ {
for (EntryIntrusiveList::iterator iter (list.begin()); for (auto& entry : list)
iter != list.end(); ++iter)
{ {
beast::PropertyStream::Map item (items); beast::PropertyStream::Map item (items);
if (iter->refcount != 0) if (entry.refcount != 0)
item ["count"] = iter->refcount; item ["count"] = entry.refcount;
item ["name"] = iter->to_string(); item ["name"] = entry.to_string();
item ["balance"] = iter->balance(now); item ["balance"] = entry.balance(now);
if (iter->remote_balance != 0) if (entry.remote_balance != 0)
item ["remote_balance"] = iter->remote_balance; item ["remote_balance"] = entry.remote_balance;
} }
} }

View File

@@ -87,12 +87,13 @@ public:
Charge const fee (dropThreshold + 1); Charge const fee (dropThreshold + 1);
beast::IP::Endpoint const addr ( beast::IP::Endpoint const addr (
beast::IP::Endpoint::from_string ("207.127.82.2")); beast::IP::Endpoint::from_string ("207.127.82.2"));
{ {
Consumer c (logic.newInboundEndpoint (addr)); Consumer c (logic.newInboundEndpoint (addr));
// Create load until we get a warning // Create load until we get a warning
for (std::size_t n (maxLoopCount); true; --n) std::size_t n (maxLoopCount);
while (--n > 0)
{ {
if (n == 0) if (n == 0)
{ {
@@ -109,7 +110,7 @@ public:
} }
// Create load until we get dropped // Create load until we get dropped
for (std::size_t n (maxLoopCount); true; --n) while (--n > 0)
{ {
if (n == 0) if (n == 0)
{ {
@@ -119,34 +120,49 @@ public:
if (c.charge (fee) == drop) if (c.charge (fee) == drop)
{ {
pass (); // Disconnect abusive Consumer
expect (c.disconnect ());
break; break;
} }
++logic.clock (); ++logic.clock ();
} }
} }
// Make sure the consumer is on the blacklist for a while.
{ {
Consumer c (logic.newInboundEndpoint (addr)); Consumer c (logic.newInboundEndpoint (addr));
expect (c.disconnect ()); logic.periodicActivity();
} if (c.disposition () != drop)
for (std::size_t n (maxLoopCount); true; --n)
{
Consumer c (logic.newInboundEndpoint (addr));
if (n == 0)
{ {
fail ("Loop count exceeded without expiring black list"); fail ("Dropped consumer not put on blacklist");
return; return;
} }
}
if (c.disposition() != drop) // Makes sure the Consumer is eventually removed from blacklist
bool readmitted = false;
{
// Give Consumer time to become readmitted. Should never
// exceed expiration time.
std::size_t n (secondsUntilExpiration + 1);
while (--n > 0)
{ {
pass (); ++logic.clock ();
break; logic.periodicActivity();
Consumer c (logic.newInboundEndpoint (addr));
if (c.disposition () != drop)
{
readmitted = true;
break;
}
} }
} }
if (readmitted == false)
{
fail ("Dropped Consumer left on blacklist too long");
return;
}
pass();
} }
void testImports (beast::Journal j) void testImports (beast::Journal j)