Use aged containers in Validators module (RIPD-349)

This commit is contained in:
Scott Schurr
2014-08-06 14:24:42 -07:00
committed by Vinnie Falco
parent 5322955f2b
commit 019c1af435
17 changed files with 417 additions and 353 deletions

View File

@@ -67,7 +67,7 @@ struct Count
}
/** Output to PropertyStream. */
void onWrite (beast::PropertyStream::Map& map)
void onWrite (beast::PropertyStream::Map& map) const
{
map["received"] = received;
map["expected"] = expected;
@@ -81,14 +81,6 @@ struct Count
std::size_t closed; // Number of validations with closed ledgers
};
inline Count operator+ (Count const& lhs, Count const& rhs)
{
return Count (
lhs.received + rhs.received,
lhs.expected + rhs.expected,
lhs.closed + rhs.closed);
}
}
}

View File

@@ -20,11 +20,99 @@
#ifndef RIPPLE_VALIDATORS_LOGIC_H_INCLUDED
#define RIPPLE_VALIDATORS_LOGIC_H_INCLUDED
#include <beast/chrono/manual_clock.h>
#include <beast/container/aged_unordered_set.h>
#include <memory>
namespace ripple {
namespace Validators {
// Forward declare unit test so it can be a friend to LRUCache.
class Logic_test;
namespace detail
{
// The LRUCache class (ab)uses an aged_unordered_set so it can hold on
// to a limited number of values. When the container gets too full the
// LRUCache expires the oldest values.
//
// An aged_unordered_set gives us the functionality we want by keeping the
// chronological list. We don't care about the actual time of entry, only
// the time ordering. So we hook the aged_unordered_set up to a maunual_clock
// (which we never bother to increment).
//
// The implementation could potentially be changed to be time-based, rather
// than count-based, by hooking up a beast::basic_second_clock in place of the
// manual_clock and deleting a range of expired entries on insert.
//
template <class Key,
class Hash = std::hash <Key>,
class KeyEqual = std::equal_to <Key>,
class Allocator = std::allocator <Key> >
class LRUCache
{
private:
typedef std::chrono::seconds Duration;
typedef beast::manual_clock <Duration> Clock;
typedef beast::aged_unordered_set <
Key, Duration, Hash, KeyEqual, Allocator> ContainerType;
public:
LRUCache () = delete;
LRUCache (LRUCache const& lhs) = delete;
explicit LRUCache (
size_t item_max,
Hash hash = Hash(),
KeyEqual equal = KeyEqual(),
Allocator alloc = Allocator())
: m_clock ()
, m_cache (m_clock, hash, equal, alloc)
, m_item_max (item_max)
{
m_cache.reserve (m_item_max + 1);
}
LRUCache& operator= (LRUCache const& lhs) = delete;
// Add the entry. Remove the oldest entry if we went over our limit.
// Returns true on insertion (the entry was not already in the cache).
bool insert (Key const& key)
{
auto const insertRet (m_cache.insert (key));
if (insertRet.second == false)
{
// key is re-referenced. Mark it as MRU.
m_cache.touch (insertRet.first);
}
else if (m_cache.size () > m_item_max)
{
// Added key and cache is too big. Erase oldest element.
m_cache.erase (m_cache.chronological.begin ());
}
return insertRet.second;
}
size_t size ()
{
return m_cache.size();
}
Key const* oldest ()
{
return m_cache.empty() ? nullptr : &(*m_cache.chronological.begin());
}
private:
Clock m_clock;
ContainerType m_cache;
const size_t m_item_max;
};
} // namespace detail
//------------------------------------------------------------------------------
// Encapsulates the logic for creating the chosen validators.
// This is a separate class to facilitate the unit tests.
//
@@ -44,6 +132,7 @@ public:
beast::SharedPtr <Source> fetchSource;
};
private:
typedef beast::SharedData <State> SharedState;
SharedState m_state;
@@ -72,27 +161,28 @@ public:
// Filters duplicate validations
//
typedef CycledSet <ReceivedValidation,
ReceivedValidationHash,
ReceivedValidationKeyEqual> SeenValidations;
SeenValidations m_seenValidations;
typedef detail::LRUCache <ReceivedValidation,
ReceivedValidationHash,
ReceivedValidationKeyEqual> RecentValidations;
RecentValidations m_recentValidations;
// Filters duplicate ledger hashes
//
typedef CycledSet <RippleLedgerHash,
RippleLedgerHash::hasher,
RippleLedgerHash::key_equal> SeenLedgerHashes;
SeenLedgerHashes m_seenLedgerHashes;
typedef detail::LRUCache <RippleLedgerHash,
RippleLedgerHash::hasher,
RippleLedgerHash::key_equal> RecentLedgerHashes;
RecentLedgerHashes m_recentLedgerHashes;
//--------------------------------------------------------------------------
public:
explicit Logic (Store& store, beast::Journal journal = beast::Journal ())
: m_store (store)
, m_journal (journal)
, m_ledgerID (0)
, m_rebuildChosenList (false)
, m_seenValidations (seenValidationsCacheSize)
, m_seenLedgerHashes (seenLedgersCacheSize)
, m_recentValidations (recentValidationsCacheSize)
, m_recentLedgerHashes (recentLedgersCacheSize)
{
m_sources.reserve (16);
}
@@ -233,6 +323,18 @@ public:
return numRemoved;
}
/** Return reference to m_sources for Mangager::PropertyStream. */
SourceTable const& getSources ()
{
return m_sources;
}
/** Return reference to m_validators for Manager::PropertyStream. */
ValidatorTable const& getValidators ()
{
return m_validators;
}
//--------------------------------------------------------------------------
//
// Chosen
@@ -275,12 +377,10 @@ public:
}
}
/** Returns the current Chosen list.
This can be called from any thread at any time.
*/
ChosenList::Ptr getChosen ()
/** Returns number of elements in the current Chosen list. */
std::uint32_t getChosenSize()
{
return m_chosenList;
return m_chosenList ? m_chosenList->size() : 0;
}
//--------------------------------------------------------------------------
@@ -393,7 +493,7 @@ public:
{
std::size_t n (0);
beast::Time const currentTime (beast::Time::getCurrentTime ());
for (SourceTable::iterator iter = m_sources.begin ();
(n == 0) && iter != m_sources.end (); ++iter)
{
@@ -420,7 +520,7 @@ public:
}
//--------------------------------------------------------------------------
//
//
// Ripple interface
//
//--------------------------------------------------------------------------
@@ -434,10 +534,10 @@ public:
if (iter != m_validators.end ())
{
// Filter duplicates (defensive programming)
if (! m_seenValidations.insert (rv))
if (! m_recentValidations.insert (rv))
return;
iter->second.receiveValidation (rv.ledgerHash);
iter->second.on_validation (rv.ledgerHash);
m_journal.trace <<
"New trusted validation for " << rv.ledgerHash <<
@@ -456,7 +556,7 @@ public:
void ledgerClosed (RippleLedgerHash const& ledgerHash)
{
// Filter duplicates (defensive programming)
if (! m_seenLedgerHashes.insert (ledgerHash))
if (! m_recentLedgerHashes.insert (ledgerHash))
return;
++m_ledgerID;
@@ -466,10 +566,7 @@ public:
for (ValidatorTable::iterator iter (m_validators.begin());
iter != m_validators.end(); ++iter)
{
Validator& v (iter->second);
v.ledgerClosed (ledgerHash);
}
iter->second.on_ledger (ledgerHash);
}
// Returns `true` if the public key hash is contained in the Chosen List.

View File

@@ -93,7 +93,7 @@
* Measurements of constructive/destructive behavior is
calculated in units of percentage of ledgers for which
the behavior is measured.
What we want from the unique node list:
- Some number of trusted roots (known by domain)
probably organizations whose job is to provide a list of validators
@@ -151,8 +151,8 @@ public:
bool m_checkSources;
ManagerImp (
Stoppable& parent,
beast::File const& pathToDbFileOrDirectory,
Stoppable& parent,
beast::File const& pathToDbFileOrDirectory,
beast::Journal journal)
: Stoppable ("Validators::Manager", parent)
, Thread ("Validators")
@@ -232,14 +232,14 @@ public:
//--------------------------------------------------------------------------
void receiveValidation (ReceivedValidation const& rv)
void on_receive_validation (ReceivedValidation const& rv)
{
if (! isStopping())
m_queue.dispatch (m_context.wrap (std::bind (
&Logic::receiveValidation, &m_logic, rv)));
}
void ledgerClosed (RippleLedgerHash const& ledgerHash)
void on_ledger_closed (RippleLedgerHash const& ledgerHash)
{
if (! isStopping())
m_queue.dispatch (m_context.wrap (std::bind (
@@ -283,24 +283,21 @@ public:
{
Context::Scope scope (m_context);
map ["trusted"] = std::uint32_t (
m_logic.m_chosenList ?
m_logic.m_chosenList->size() : 0);
map ["trusted"] = m_logic.getChosenSize();
{
beast::PropertyStream::Set items ("sources", map);
for (Logic::SourceTable::const_iterator iter (m_logic.m_sources.begin());
iter != m_logic.m_sources.end(); ++iter)
items.add (iter->source->to_string());
for (auto const& entry : m_logic.getSources())
{
items.add (entry.source->to_string());
}
}
{
beast::PropertyStream::Set items ("validators", map);
for (Logic::ValidatorTable::iterator iter (m_logic.m_validators.begin());
iter != m_logic.m_validators.end(); ++iter)
for (auto const& entry : m_logic.getValidators())
{
RipplePublicKey const& publicKey (iter->first);
Validator const& validator (iter->second);
RipplePublicKey const& publicKey (entry.first);
Validator const& validator (entry.second);
beast::PropertyStream::Map item (items);
item["public_key"] = publicKey.to_string();
validator.count().onWrite (item);
@@ -317,7 +314,7 @@ public:
void init ()
{
beast::Error error (m_store.open (m_databaseFile));
if (! error)
{
m_logic.load ();
@@ -383,7 +380,7 @@ Manager::Manager ()
}
Validators::Manager* Validators::Manager::New (
beast::Stoppable& parent,
beast::Stoppable& parent,
beast::File const& pathToDbFileOrDirectory,
beast::Journal journal)
{

View File

@@ -184,6 +184,127 @@ public:
}
}
void testLRUCache ()
{
detail::LRUCache<std::string> testCache {3};
expect (testCache.size () == 0, "Wrong initial size");
struct TestValues
{
char const* const value;
bool const insertResult;
};
{
std::array <TestValues, 3> const v1 {
{{"A", true}, {"B", true}, {"C", true}}};
for (auto const& v : v1)
{
expect (testCache.insert (v.value) == v.insertResult,
"Failed first insert tests");
}
expect (testCache.size() == 3, "Unexpected intermediate size");
expect (*testCache.oldest() == "A", "Unexpected oldest member");
}
{
std::array <TestValues, 3> const v2 {
{{"A", false}, {"D", true}, {"C", false}}};
for (auto const& v : v2)
{
expect (testCache.insert (v.value) == v.insertResult,
"Failed second insert tests");
}
expect (testCache.size() == 3, "Unexpected final size");
expect (*testCache.oldest() == "A",
"Unexpected oldest member");
}
}
void testValidator ()
{
int receivedCount = 0;
int expectedCount = 0;
int closedCount = 0;
// Lambda as local function
auto updateCounts = [&](bool received, bool validated)
{
bool const sent = received || validated;
receivedCount += sent && !validated ? 1 : 0;
expectedCount += sent && !received ? 1 : 0;
closedCount += validated && received ? 1 : 0;
};
auto checkCounts = [&] (Count const& count)
{
// std::cout << "Received actual: " << count.received << " expected: " << receivedCount << std::endl;
// std::cout << "Expected actual: " << count.expected << " expected: " << expectedCount << std::endl;
// std::cout << "Closed actual: " << count.closed << " expected: " << closedCount << std::endl;
expect (count.received == receivedCount, "Bad received count");
expect (count.expected == expectedCount, "Bad expected count");
expect (count.closed == closedCount, "Bad closed count");
};
Validator validator;
std::uint64_t i = 1;
// Received before closed
for (; i <= ledgersPerValidator; ++i)
{
RippleLedgerHash const hash {i};
bool const received = (i % 13 != 0);
bool const validated = (i % 7 != 0);
updateCounts (received, validated);
if (received)
validator.on_validation (hash);
if (validated)
validator.on_ledger (hash);
}
checkCounts (validator.count ());
// Closed before received
for (; i <= ledgersPerValidator * 2; ++i)
{
RippleLedgerHash const hash {i};
bool const received = (i % 11 != 0);
bool const validated = (i % 17 != 0);
updateCounts (received, validated);
if (validated)
validator.on_ledger (hash);
if (received)
validator.on_validation (hash);
}
checkCounts (validator.count ());
{
// Repeated receives
RippleLedgerHash const hash {++i};
receivedCount += 1;
for (auto j = 0; j < 100; ++j)
{
validator.on_validation (hash);
}
}
checkCounts (validator.count ());
{
// Repeated closes
RippleLedgerHash const hash {++i};
expectedCount += 1;
for (auto j = 0; j < 100; ++j)
{
validator.on_ledger (hash);
}
}
checkCounts (validator.count ());
}
void testLogic ()
{
//TestStore store;
@@ -206,7 +327,7 @@ public:
logic.fetch_one ();
ChosenList::Ptr list (logic.getChosen ());
// auto chosenSize (logic.getChosenSize ());
pass ();
}
@@ -214,6 +335,8 @@ public:
void
run ()
{
testLRUCache ();
testValidator ();
testLogic ();
}
};

View File

@@ -20,10 +20,6 @@
#ifndef RIPPLE_VALIDATORS_TUNING_H_INCLUDED
#define RIPPLE_VALIDATORS_TUNING_H_INCLUDED
#include <ripple/common/UnorderedContainers.h>
#include <boost/version.hpp>
namespace ripple {
namespace Validators {
@@ -43,115 +39,16 @@ enum
#endif
// This tunes the preallocated arrays
,expectedNumberOfResults = 1000
,expectedNumberOfResults = 1000
// NUmber of entries in the seen validations cache
,seenValidationsCacheSize = 1000
// Number of entries in the recent validations cache
,recentValidationsCacheSize = 1000
// Number of entries in the seen ledgers cache
,seenLedgersCacheSize = 1000 // about half an hour at 2/sec
// Number of entries in the recent ledgers cache
,recentLedgersCacheSize = 1000 // about half an hour at 2/sec
// Number of closed Ledger entries per Validator
,ledgersPerValidator = 100 // this shouldn't be too large
};
//------------------------------------------------------------------------------
/** Cycled associative map of unique keys. */
template <class Key,
class T,
class Info, // per-container info
class Hash = typename Key::hasher,
class KeyEqual = std::equal_to <Key>,
class Allocator = std::allocator <std::pair <Key const, T> > >
class CycledMap
{
private:
typedef hash_map <Key, T, Hash, KeyEqual, Allocator> ContainerType;
typedef typename ContainerType::iterator iterator;
public:
typedef typename ContainerType::key_type key_type;
typedef typename ContainerType::value_type value_type;
typedef typename ContainerType::size_type size_type;
typedef typename ContainerType::difference_type difference_type;
typedef typename ContainerType::hasher hasher;
typedef typename ContainerType::key_equal key_equal;
typedef typename ContainerType::allocator_type allocator_type;
typedef typename ContainerType::reference reference;
typedef typename ContainerType::const_reference const_reference;
typedef typename ContainerType::pointer pointer;
typedef typename ContainerType::const_pointer const_pointer;
explicit CycledMap (
size_type item_max,
Hash hash = Hash(),
KeyEqual equal = KeyEqual(),
Allocator alloc = Allocator())
: m_max (item_max)
, m_hash (hash)
, m_equal (equal)
, m_alloc (alloc)
, m_front (m_max, hash, equal, alloc)
, m_back (m_max, hash, equal, alloc)
{
}
Info& front()
{ return m_front_info; }
Info const & front() const
{ return m_front_info; }
Info& back ()
{ return m_back_info; }
Info const& back () const
{ return m_back_info; }
/** Returns `true` if the next real insert would swap. */
bool full() const
{
return m_front.size() >= m_max;
}
/** Insert the value if it doesn't already exist. */
std::pair <T&, Info&> insert (value_type const& value)
{
if (full())
cycle ();
iterator iter (m_back.find (value.first));
if (iter != m_back.end())
return std::make_pair (
std::ref (iter->second),
std::ref (m_back_info));
std::pair <iterator, bool> result (
m_front.insert (value));
return std::make_pair (
std::ref (result.first->second),
std::ref (m_front_info));
}
void cycle ()
{
std::swap (m_front, m_back);
m_front.clear ();
#if BOOST_VERSION > 105400
m_front.reserve (m_max);
#endif
std::swap (m_front_info, m_back_info);
m_front_info.clear();
}
private:
size_type m_max;
hasher m_hash;
key_equal m_equal;
allocator_type m_alloc;
ContainerType m_front;
ContainerType m_back;
Info m_front_info;
Info m_back_info;
,ledgersPerValidator = 100 // this shouldn't be too large
};
}

View File

@@ -20,6 +20,11 @@
#ifndef RIPPLE_VALIDATORS_VALIDATOR_H_INCLUDED
#define RIPPLE_VALIDATORS_VALIDATOR_H_INCLUDED
#include <ripple/common/seconds_clock.h>
#include <beast/container/aged_unordered_map.h>
#include <beast/container/aged_map.h>
#include <beast/container/aged_container_utility.h>
namespace ripple {
namespace Validators {
@@ -27,83 +32,132 @@ namespace Validators {
class Validator
{
private:
/** State of a ledger. */
struct Ledger
// State of a ledger.
struct Entry
{
Ledger() : closed (false), received (false)
{ }
bool closed; // `true` if the ledger was closed
bool received; // `true` if we got a validation
bool closed = false; // `true` if the ledger was closed
bool received = false; // `true` if we got a validation
};
/** Number of sources that reference this validator. */
int m_refCount;
// Holds the Entry of all recent ledgers for this validator.
#if 1
typedef beast::aged_unordered_map <RippleLedgerHash, Entry,
std::chrono::seconds,
//beast::hardened_hash<>,
std::hash<RippleLedgerHash>,
RippleLedgerHash::key_equal> Table;
#else
typedef beast::aged_map <RippleLedgerHash, Entry,
std::chrono::seconds, std::less<>> Table;
#endif
/** Holds the state of all recent ledgers for this validator. */
/** @{ */
typedef CycledMap <RippleLedgerHash, Ledger, Count, beast::hardened_hash<>,
RippleLedgerHash::key_equal> LedgerMap;
LedgerMap m_ledgers;
/** @} */
int refs_; // Number of sources that reference this validator.
Table table_;
Count count_;
public:
Validator ()
: m_refCount (0)
, m_ledgers (ledgersPerValidator)
Validator()
: refs_ (0)
, table_ (get_seconds_clock ())
{
}
/** Increment the number of references to this validator. */
void addRef ()
{ ++m_refCount; }
void
addRef()
{
++refs_;
}
/** Decrement the number of references to this validator.
When the reference count reaches zero, the validator will
be removed and no longer tracked.
*/
bool release ()
{ return (--m_refCount) == 0; }
bool
release()
{
return (--refs_) == 0;
}
size_t
size () const
{
return table_.size ();
}
/** Returns the composite performance statistics. */
Count count () const
{ return m_ledgers.front() + m_ledgers.back(); }
Count const&
count () const
{
return count_;
}
/** Called upon receipt of a validation. */
void receiveValidation (RippleLedgerHash const& ledgerHash)
void
on_validation (RippleLedgerHash const& ledgerHash)
{
std::pair <Ledger&, Count&> result (m_ledgers.insert (
std::make_pair (ledgerHash, Ledger())));
Ledger& ledger (result.first);
Count& count (result.second);
ledger.received = true;
if (ledger.closed)
auto const result (table_.insert (
std::make_pair (ledgerHash, Entry())));
auto& entry (result.first->second);
if (entry.received)
return;
entry.received = true;
if (entry.closed)
{
--count.expected;
++count.closed;
--count_.expected;
++count_.closed;
//table_.erase (result.first);
}
else
{
++count.received;
++count_.received;
}
//expire();
}
/** Called when a ledger is closed. */
void ledgerClosed (RippleLedgerHash const& ledgerHash)
void
on_ledger (RippleLedgerHash const& ledgerHash)
{
std::pair <Ledger&, Count&> result (m_ledgers.insert (
std::make_pair (ledgerHash, Ledger())));
Ledger& ledger (result.first);
Count& count (result.second);
ledger.closed = true;
if (ledger.received)
auto const result (table_.insert (
std::make_pair (ledgerHash, Entry())));
auto& entry (result.first->second);
if (entry.closed)
return;
entry.closed = true;
if (entry.received)
{
--count.received;
++count.closed;
--count_.received;
++count_.closed;
//table_.erase (result.first);
}
else
{
++count.expected;
++count_.expected;
}
//expire();
}
/** Prunes old entries. */
void
expire()
{
beast::expire (table_, std::chrono::minutes(5));
}
private:
void dump ()
{
std::cout << "Validator: " << this << std::endl;
std::cout << "Size: " << table_.size() << std::endl;
std::cout << "end at: " << &(*table_.end()) << std::endl;
for (auto const& ledgerKeyAndState : table_)
{
std::cout << "keyAndState at: " << &ledgerKeyAndState.first << std::endl;
std::cout << " Hash: " << ledgerKeyAndState.first << std::endl;
std::cout << " closed: " << ledgerKeyAndState.second.closed <<
" received: " << ledgerKeyAndState.second.received <<
std::endl;
}
}
};