Pass and use time_point in DecayingSample ctor (RIPD-359)

Switches a number of places in Resource::Logic to use abstract_clock::now()
which returns a time_point.  Unfortunately Resource::Logic tracks time
locally also, but with ints, not time_point.  So Resource::Logic uses a
delicate mix of abstract_clock::now() and abstract_clock::elapsed() with
this commit.  That inconsistency could be addressed in a second commit.
This commit is contained in:
Scott Schurr
2014-07-09 20:16:43 -07:00
committed by Vinnie Falco
parent d7e08f96a5
commit 9fb09d3109
8 changed files with 152 additions and 83 deletions

View File

@@ -23,26 +23,27 @@
namespace ripple {
/** Sampling function using exponential decay to provide a continuous value. */
template <int Window,
typename Value = int,
typename Elapsed = int>
template <int Window, typename Clock, typename Value = int>
class DecayingSample
{
public:
typedef Value value_type;
typedef Elapsed elapsed_type;
typedef Value value_type;
typedef typename Clock::time_point time_point;
// No default constructed DecayingSamples allowed
DecayingSample () = delete;
/** Create a default constructed sample. */
DecayingSample ()
DecayingSample (time_point now)
: m_value (value_type())
, m_when (elapsed_type())
, m_when (now)
{
}
/** Add a new sample.
The value is first aged according to the specified time.
*/
Value add (value_type value, elapsed_type now)
Value add (value_type value, time_point now)
{
decay (now);
m_value += value;
@@ -52,7 +53,7 @@ public:
/** Retrieve the current value in normalized units.
The samples are first aged according to the specified time.
*/
Value value (elapsed_type now)
Value value (time_point now)
{
decay (now);
return m_value / Window;
@@ -60,24 +61,31 @@ public:
private:
// Apply exponential decay based on the specified time.
void decay (elapsed_type now)
void decay (time_point now)
{
if (now == m_when)
return;
if (m_value != value_type())
{
elapsed_type n (now - m_when);
typename Clock::duration n (now - m_when);
// A span larger than four times the window decays the
// value to an insignificant amount so just reset it.
//
if (n > 4 * Window)
typename Clock::duration window (Window);
if (n > 4 * window)
m_value = value_type();
else
{
while (n--)
m_value -= (m_value + Window - 1) / Window;
value_type const tick_value = 1;
typename Clock::duration const tick (tick_value);
typename Clock::duration const zero (0);
while (n > zero)
{
n -= tick;
m_value -= (m_value + Window - tick_value) / Window;
}
}
}
@@ -88,7 +96,7 @@ private:
value_type m_value;
// Last time the aging function was applied
elapsed_type m_when;
time_point m_when;
};
}

View File

@@ -1,4 +1,4 @@
# ResourceManager
# Resource::Manager #
The ResourceManager module has these responsibilities:
@@ -7,11 +7,11 @@ The ResourceManager module has these responsibilities:
- Provide an interface to share load information in a cluster.
- Warn and/or disconnect endpoints for imposing load.
## Description
## Description ##
To prevent monopolization of server resources or attacks on servers,
resource consumption is monitored at each endpoint. When consumption
exceeds certain thresholds, costs are imposed. Costs include charging
exceeds certain thresholds, costs are imposed. Costs could include charging
additional XRP for transactions, requiring a proof of work to be
performed, or simply disconnecting the endpoint.
@@ -23,8 +23,49 @@ The current "balance" of a Consumer represents resource consumption
debt or credit. Debt is accrued when bad loads are imposed. Credit is
granted when good loads are imposed. When the balance crosses heuristic
thresholds, costs are increased on the endpoint. The balance is
represented as a unitless relative quantity.
represented as a unitless relative quantity. This balance is currently
held by the Entry struct in the impl/Entry.h file.
Costs associated with specific transactions are defined in the
impl/Fees files.
Although RPC connections consume resources, they are transient and
cannot be rate limited. It is advised not to expose RPC interfaces
to the general public.
## Resource Loading ##
It is expected that when a client first connects to a server it will
impose a higher load on the server. The client may need to catch up
on transactions they've missed. The client may need to get trust lines
or transfer fees. The Manager must expect this initial peak load, but
not allow that high load to continue because, over the long term, that
would unduly stress the server.
If a client places a sustained high load on the server, that client
is initially given a warning message. If the high load continues
the Manager may tell the heavily loaded server to drop the connection
entirely and not allow re-connection for some amount of time.
Each load is monitored using a "peaking" scheme implemented using the
DecayingSample class. DecayingSample captures peaks and then decays
those peak values over time.
## Gossip ##
Each server in a cluster creates a list of IP addresses of end points
that are imposing a significant load. The server passes that list to
other servers in the cluster. Those lists are called "Gossip". They
allow the individual servers in the cluster to potentially identify a
set of IP addresses that are unduly loading the entire cluster. Again
the recourse of the individual servers is to drop connections to those
IP addresses that occur commonly in the gossip.
identify
## Access ##
Although the Resource::Manager does nothing to enforce this, in
rippled there is a single instance of the Resource::Manager. That
Resource::Manager is held by the Application. Entities that wish
to use the shared Resource::Manager can access it by calling
getResourceManager() on the Application.

View File

@@ -33,8 +33,8 @@ public:
/** The type used to hold a consumption charge. */
typedef int value_type;
/** Create a new charge with no cost (yet). */
Charge ();
// A default constructed Charge has no way to get a label. Delete
Charge () = delete;
/** Create a charge with the specified cost and name. */
Charge (value_type cost, std::string const& label = std::string());

View File

@@ -22,10 +22,6 @@
namespace ripple {
namespace Resource {
Charge::Charge ()
: m_cost (0)
{
}
Charge::Charge (value_type cost, std::string const& label)
: m_cost (cost)

View File

@@ -28,9 +28,13 @@ typedef beast::abstract_clock <std::chrono::seconds> clock_type;
// An entry in the table
struct Entry : public beast::List <Entry>::Node
{
// Dummy argument is necessary for zero-copy construction of elements
Entry (int)
// No default constructor
Entry () = delete;
// Each Entry needs to know what time it is constructed
explicit Entry(clock_type::time_point const now)
: refcount (0)
, local_balance (now)
, remote_balance (0)
, disposition (ok)
, lastWarningTime (0)
@@ -59,14 +63,14 @@ struct Entry : public beast::List <Entry>::Node
}
// Balance including remote contributions
int balance (clock_type::rep const now)
int balance (clock_type::time_point const now)
{
return local_balance.value (now) + remote_balance;
}
// Add a charge and return normalized balance
// including contributions from imports.
int add (int charge, clock_type::rep const now)
int add (int charge, clock_type::time_point const now)
{
return local_balance.add (charge, now) + remote_balance;
}
@@ -78,7 +82,7 @@ struct Entry : public beast::List <Entry>::Node
int refcount;
// Exponentially decaying balance of resource consumption
DecayingSample <decayWindowSeconds> local_balance;
DecayingSample <decayWindowSeconds, clock_type> local_balance;
// Normalized balance contribution from imports
int remote_balance;

View File

@@ -30,6 +30,26 @@ struct Key
beast::IP::Endpoint address;
std::string name;
// No default constructor
Key () = delete;
// Convenience constructors
Key (Kind k, beast::IP::Endpoint const& addr)
: kind(k)
, address(addr)
, name()
{
assert(kind != kindAdmin);
}
Key (Kind k, const std::string& n)
: kind(k)
, address()
, name(n)
{
assert(kind == kindAdmin);
}
struct hasher
{
std::size_t operator() (Key const& v) const

View File

@@ -29,27 +29,32 @@ namespace Resource {
class Logic
{
public:
private:
typedef beast::abstract_clock <std::chrono::seconds> clock_type;
typedef ripple::unordered_map <std::string, Import> Imports;
typedef ripple::unordered_map <Key, Entry, Key::hasher, Key::key_equal> Table;
typedef beast::List <Entry> EntryIntrusiveList;
struct State
{
// Table of all entries
Table table;
// Because the following are intrusive lists, a given Entry may be in
// at most list at a given instant. The Entry must be removed from
// one list before placing it in another.
// List of all active inbound entries
beast::List <Entry> inbound;
EntryIntrusiveList inbound;
// List of all active outbound entries
beast::List <Entry> outbound;
EntryIntrusiveList outbound;
// List of all active admin entries
beast::List <Entry> admin;
EntryIntrusiveList admin;
// List of all inactve entries
beast::List <Entry> inactive;
EntryIntrusiveList inactive;
// All imported gossip data
Imports import_table;
@@ -75,6 +80,7 @@ public:
beast::Journal m_journal;
//--------------------------------------------------------------------------
public:
Logic (beast::insight::Collector::ptr const& collector,
clock_type& clock, beast::Journal journal)
@@ -101,16 +107,14 @@ public:
if (isWhitelisted (address))
return newAdminEndpoint (to_string (address));
Key key;
key.kind = kindInbound;
key.address = address.at_port (0);
Entry* entry (nullptr);
{
SharedState::Access state (m_state);
std::pair <Table::iterator, bool> result (
state->table.emplace (key, 0));
state->table.emplace (std::piecewise_construct,
std::make_tuple(kindInbound, address.at_port (0)), // Key
std::make_tuple(m_clock.now()))); // Entry
entry = &result.first->second;
entry->key = &result.first->first;
++entry->refcount;
@@ -134,16 +138,14 @@ public:
if (isWhitelisted (address))
return newAdminEndpoint (to_string (address));
Key key;
key.kind = kindOutbound;
key.address = address;
Entry* entry (nullptr);
{
SharedState::Access state (m_state);
std::pair <Table::iterator, bool> result (
state->table.emplace (key, 0));
state->table.emplace (std::piecewise_construct,
std::make_tuple(kindOutbound, address), // Key
std::make_tuple(m_clock.now()))); // Entry
entry = &result.first->second;
entry->key = &result.first->first;
++entry->refcount;
@@ -164,16 +166,14 @@ public:
Consumer newAdminEndpoint (std::string const& name)
{
Key key;
key.kind = kindAdmin;
key.name = name;
Entry* entry (nullptr);
{
SharedState::Access state (m_state);
std::pair <Table::iterator, bool> result (
state->table.emplace (key, 0));
state->table.emplace (std::piecewise_construct,
std::make_tuple(kindAdmin, name),
std::make_tuple(m_clock.now())));
entry = &result.first->second;
entry->key = &result.first->first;
++entry->refcount;
@@ -194,10 +194,6 @@ public:
Entry& elevateToAdminEndpoint (Entry& prior, std::string const& name)
{
Key key;
key.kind = kindAdmin;
key.name = name;
m_journal.info <<
"Elevate " << prior << " to " << name;
@@ -206,7 +202,9 @@ public:
{
SharedState::Access state (m_state);
std::pair <Table::iterator, bool> result (
state->table.emplace (key, 0));
state->table.emplace (std::piecewise_construct,
std::make_tuple(kindAdmin, name),
std::make_tuple(m_clock.now())));
entry = &result.first->second;
entry->key = &result.first->first;
++entry->refcount;
@@ -231,12 +229,12 @@ public:
Json::Value getJson (int threshold)
{
clock_type::rep const now (m_clock.elapsed());
clock_type::time_point const now (m_clock.now());
Json::Value ret (Json::objectValue);
SharedState::Access state (m_state);
for (beast::List <Entry>::iterator iter (state->inbound.begin());
for (EntryIntrusiveList::iterator iter (state->inbound.begin());
iter != state->inbound.end(); ++iter)
{
int localBalance = iter->local_balance.value (now);
@@ -249,7 +247,7 @@ public:
}
}
for (beast::List <Entry>::iterator iter (state->outbound.begin());
for (EntryIntrusiveList::iterator iter (state->outbound.begin());
iter != state->outbound.end(); ++iter)
{
int localBalance = iter->local_balance.value (now);
@@ -262,7 +260,7 @@ public:
}
}
for (beast::List <Entry>::iterator iter (state->admin.begin());
for (EntryIntrusiveList::iterator iter (state->admin.begin());
iter != state->admin.end(); ++iter)
{
int localBalance = iter->local_balance.value (now);
@@ -281,14 +279,14 @@ public:
Gossip exportConsumers ()
{
clock_type::rep const now (m_clock.elapsed());
clock_type::time_point const now (m_clock.now());
Gossip gossip;
SharedState::Access state (m_state);
gossip.items.reserve (state->inbound.size());
for (beast::List <Entry>::iterator iter (state->inbound.begin());
for (EntryIntrusiveList::iterator iter (state->inbound.begin());
iter != state->inbound.end(); ++iter)
{
Gossip::Item item;
@@ -307,18 +305,19 @@ public:
void importConsumers (std::string const& origin, Gossip const& gossip)
{
clock_type::rep const now (m_clock.elapsed());
clock_type::rep const elapsed (m_clock.elapsed());
{
SharedState::Access state (m_state);
std::pair <Imports::iterator, bool> result (
state->import_table.emplace (origin, 0));
state->import_table.emplace (std::piecewise_construct,
std::make_tuple(origin), // Key
std::make_tuple(m_clock.elapsed()))); // Import
if (result.second)
{
// This is a new import
Import& next (result.first->second);
next.whenExpires = now + gossipExpirationSeconds;
next.whenExpires = elapsed + gossipExpirationSeconds;
next.items.reserve (gossip.items.size());
for (std::vector <Gossip::Item>::const_iterator iter (gossip.items.begin());
iter != gossip.items.end(); ++iter)
@@ -336,7 +335,7 @@ public:
// balances and then deduct the old remote balances.
Import next;
next.whenExpires = now + gossipExpirationSeconds;
next.whenExpires = elapsed + gossipExpirationSeconds;
next.items.reserve (gossip.items.size());
for (std::vector <Gossip::Item>::const_iterator iter (gossip.items.begin());
iter != gossip.items.end(); ++iter)
@@ -376,12 +375,12 @@ public:
{
SharedState::Access state (m_state);
clock_type::rep const now (m_clock.elapsed());
clock_type::rep const elapsed (m_clock.elapsed());
for (beast::List <Entry>::iterator iter (
for (EntryIntrusiveList::iterator iter (
state->inactive.begin()); iter != state->inactive.end();)
{
if (iter->whenExpires <= now)
if (iter->whenExpires <= elapsed)
{
m_journal.debug << "Expired " << *iter;
Table::iterator table_iter (
@@ -399,7 +398,7 @@ public:
while (iter != state->import_table.end())
{
Import& import (iter->second);
if (iter->second.whenExpires <= now)
if (iter->second.whenExpires <= elapsed)
{
for (std::vector <Import::Item>::iterator item_iter (import.items.begin());
item_iter != import.items.end(); ++item_iter)
@@ -474,7 +473,7 @@ public:
Disposition charge (Entry& entry, Charge const& fee, SharedState::Access& state)
{
clock_type::rep const now (m_clock.elapsed());
clock_type::time_point const now (m_clock.now());
int const balance (entry.add (fee.cost(), now));
m_journal.trace <<
"Charging " << entry << " for " << fee;
@@ -484,12 +483,13 @@ public:
bool warn (Entry& entry, SharedState::Access& state)
{
bool notify (false);
clock_type::rep const now (m_clock.elapsed());
if (entry.balance (now) >= warningThreshold && now != entry.lastWarningTime)
clock_type::rep const elapsed (m_clock.elapsed());
if (entry.balance (m_clock.now()) >= warningThreshold &&
elapsed != entry.lastWarningTime)
{
charge (entry, feeWarning, state);
notify = true;
entry.lastWarningTime = now;
entry.lastWarningTime = elapsed;
}
if (notify)
@@ -505,7 +505,7 @@ public:
bool disconnect (Entry& entry, SharedState::Access& state)
{
bool drop (false);
clock_type::rep const now (m_clock.elapsed());
clock_type::time_point const now (m_clock.now());
if (entry.balance (now) >= dropThreshold)
{
charge (entry, feeDrop, state);
@@ -518,7 +518,7 @@ public:
int balance (Entry& entry, SharedState::Access& state)
{
return entry.balance (m_clock.elapsed());
return entry.balance (m_clock.now());
}
//--------------------------------------------------------------------------
@@ -568,11 +568,11 @@ public:
//--------------------------------------------------------------------------
void writeList (
clock_type::rep const now,
clock_type::time_point const now,
beast::PropertyStream::Set& items,
beast::List <Entry>& list)
EntryIntrusiveList& list)
{
for (beast::List <Entry>::iterator iter (list.begin());
for (EntryIntrusiveList::iterator iter (list.begin());
iter != list.end(); ++iter)
{
beast::PropertyStream::Map item (items);
@@ -587,7 +587,7 @@ public:
void onWrite (beast::PropertyStream::Map& map)
{
clock_type::rep const now (m_clock.elapsed());
clock_type::time_point const now (m_clock.now());
SharedState::Access state (m_state);

View File

@@ -26,10 +26,10 @@ class ManagerImp
: public Manager
, public beast::Thread
{
public:
private:
beast::Journal m_journal;
Logic m_logic;
public:
ManagerImp (beast::insight::Collector::ptr const& collector,
beast::Journal journal)
: Thread ("Resource::Manager")
@@ -63,7 +63,7 @@ public:
{
return m_logic.exportConsumers();
}
void importConsumers (std::string const& origin, Gossip const& gossip)
{
m_logic.importConsumers (origin, gossip);