diff --git a/Builds/VisualStudio2013/RippleD.vcxproj b/Builds/VisualStudio2013/RippleD.vcxproj index dd23303f96..78ce2ff170 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj +++ b/Builds/VisualStudio2013/RippleD.vcxproj @@ -103,6 +103,12 @@ true + + true + true + true + true + true true @@ -115,6 +121,12 @@ true true + + true + true + true + true + true true @@ -133,6 +145,24 @@ true true + + true + true + true + true + + + true + true + true + true + + + true + true + true + true + true true @@ -2235,16 +2265,17 @@ + - + + - - + diff --git a/Builds/VisualStudio2013/RippleD.vcxproj.filters b/Builds/VisualStudio2013/RippleD.vcxproj.filters index 06fa9c29b9..ec16d1dc9e 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2013/RippleD.vcxproj.filters @@ -1473,6 +1473,21 @@ [2] Old Ripple\ripple_overlay + + [1] Ripple\peerfinder\impl + + + [1] Ripple\peerfinder\impl + + + [1] Ripple\peerfinder\impl + + + [1] Ripple\peerfinder\impl + + + [1] Ripple\peerfinder\impl + @@ -2913,9 +2928,6 @@ [1] Ripple\peerfinder\impl - - [1] Ripple\peerfinder\impl - [1] Ripple\peerfinder\impl @@ -2931,12 +2943,6 @@ [1] Ripple\peerfinder\impl - - [1] Ripple\peerfinder\impl - - - [1] Ripple\peerfinder\impl - [1] Ripple\peerfinder\impl @@ -2946,9 +2952,6 @@ [1] Ripple\peerfinder\impl - - [1] Ripple\peerfinder\impl - [1] Ripple\peerfinder\impl @@ -3036,6 +3039,21 @@ [2] Old Ripple\ripple_overlay + + [1] Ripple\peerfinder\impl + + + [1] Ripple\peerfinder\impl + + + [1] Ripple\peerfinder\impl + + + [1] Ripple\peerfinder\impl + + + [1] Ripple\peerfinder\impl + diff --git a/src/ripple/peerfinder/README.md b/src/ripple/peerfinder/README.md index 6bda585976..0924256987 100644 --- a/src/ripple/peerfinder/README.md +++ b/src/ripple/peerfinder/README.md @@ -260,7 +260,7 @@ Slot properties may be combined and are not mutually exclusive. configuration file or learned through overlay messages from other trusted peers. Cluster slots do not count towards connection limits. -* **Superpeer** (2.0) +* **Superpeer** (forthcoming) A superpeer slot is a connection to a peer which can accept incoming connections, meets certain resource availaibility requirements (such as diff --git a/src/ripple/peerfinder/api/Endpoint.h b/src/ripple/peerfinder/api/Endpoint.h index 77edea6ca7..37229b6339 100644 --- a/src/ripple/peerfinder/api/Endpoint.h +++ b/src/ripple/peerfinder/api/Endpoint.h @@ -27,11 +27,15 @@ namespace PeerFinder { struct Endpoint { Endpoint (); - + + Endpoint (IP::Endpoint const& ep, int hops_); + int hops; IP::Endpoint address; }; +bool operator< (Endpoint const& lhs, Endpoint const& rhs); + } } diff --git a/src/ripple/peerfinder/impl/Bootcache.cpp b/src/ripple/peerfinder/impl/Bootcache.cpp new file mode 100644 index 0000000000..06aceb50c5 --- /dev/null +++ b/src/ripple/peerfinder/impl/Bootcache.cpp @@ -0,0 +1,267 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "Bootcache.h" + +namespace ripple { +namespace PeerFinder { + +Bootcache::Bootcache ( + Store& store, + clock_type& clock, + Journal journal) + : m_store (store) + , m_clock (clock) + , m_journal (journal) + , m_whenUpdate (m_clock.now ()) +{ +} + +Bootcache::~Bootcache () +{ + update(); +} + +bool +Bootcache::empty() const +{ + return m_map.empty(); +} + +Bootcache::map_type::size_type +Bootcache::size() const +{ + return m_map.size(); +} + +Bootcache::const_iterator +Bootcache::begin() const +{ + return const_iterator (m_map.right.begin()); +} + +Bootcache::const_iterator +Bootcache::cbegin() const +{ + return const_iterator (m_map.right.begin()); +} + +Bootcache::const_iterator +Bootcache::end() const +{ + return const_iterator (m_map.right.end()); +} + +Bootcache::const_iterator +Bootcache::cend() const +{ + return const_iterator (m_map.right.end()); +} + +void +Bootcache::clear() +{ + m_map.clear(); + m_needsUpdate = true; +} + +//-------------------------------------------------------------------------- + +void +Bootcache::load () +{ + clear(); + auto const n (m_store.load ( + [this](IP::Endpoint const& endpoint, int valence) + { + auto const result (this->m_map.insert ( + value_type (endpoint, valence))); + if (! result.second) + { + if (this->m_journal.error) + this->m_journal.error << leftw (18) << + "Bootcache discard " << endpoint; + } + })); + + if (n > 0) + { + if (m_journal.info) m_journal.info << leftw (18) << + "Bootcache loaded " << n << + ((n > 1) ? " addresses" : " address"); + prune (); + } +} + +bool +Bootcache::insert (IP::Endpoint const& endpoint) +{ + auto const result (m_map.insert ( + value_type (endpoint, 0))); + if (result.second) + { + if (m_journal.trace) m_journal.trace << leftw (18) << + "Bootcache insert " << endpoint; + prune (); + flagForUpdate(); + } + return result.second; +} + +void +Bootcache::on_success (IP::Endpoint const& endpoint) +{ + auto result (m_map.insert ( + value_type (endpoint, 1))); + if (result.second) + { + prune (); + } + else + { + Entry entry (result.first->right); + if (entry.valence() < 0) + entry.valence() = 0; + ++entry.valence(); + m_map.erase (result.first); + result = m_map.insert ( + value_type (endpoint, entry)); + assert (result.second); + } + Entry const& entry (result.first->right); + if (m_journal.info) m_journal.info << leftw (18) << + "Bootcache connect " << endpoint << + " with " << entry.valence() << + ((entry.valence() > 1) ? " successes" : " success"); + flagForUpdate(); +} + +void +Bootcache::on_failure (IP::Endpoint const& endpoint) +{ + auto result (m_map.insert ( + value_type (endpoint, -1))); + if (result.second) + { + prune(); + } + else + { + Entry entry (result.first->right); + if (entry.valence() > 0) + entry.valence() = 0; + --entry.valence(); + m_map.erase (result.first); + result = m_map.insert ( + value_type (endpoint, entry)); + assert (result.second); + } + Entry const& entry (result.first->right); + auto const n (std::abs (entry.valence())); + if (m_journal.debug) m_journal.debug << leftw (18) << + "Bootcache failed " << endpoint << + " with " << n << + ((n > 1) ? " attempts" : " attempt"); + flagForUpdate(); +} + +void +Bootcache::periodicActivity () +{ + checkUpdate(); +} + +//-------------------------------------------------------------------------- + +void +Bootcache::onWrite (PropertyStream::Map& map) +{ + map ["entries"] = uint32 (m_map.size()); +} + + // Checks the cache size and prunes if its over the limit. +void +Bootcache::prune () +{ + if (size() <= Tuning::bootcacheSize) + return; + + // Calculate the amount to remove + auto count ((size() * + Tuning::bootcachePrunePercent) / 100); + decltype(count) pruned (0); + + // Work backwards because bimap doesn't handle + // erasing using a reverse iterator very well. + // + for (auto iter (m_map.right.end()); + count-- > 0 && iter != m_map.right.begin(); ++pruned) + { + --iter; + IP::Endpoint const& endpoint (iter->get_left()); + Entry const& entry (iter->get_right()); + if (m_journal.trace) m_journal.trace << leftw (18) << + "Bootcache pruned" << endpoint << + " at valence " << entry.valence(); + iter = m_map.right.erase (iter); + } + + if (m_journal.debug) m_journal.debug << leftw (18) << + "Bootcache pruned " << pruned << " entries total"; +} + +// Updates the Store with the current set of entries if needed. +void +Bootcache::update () +{ + if (! m_needsUpdate) + return; + std::vector list; + list.reserve (m_map.size()); + for (auto const& e : m_map) + { + Store::Entry se; + se.endpoint = e.get_left(); + se.valence = e.get_right().valence(); + list.push_back (se); + } + m_store.save (list); + // Reset the flag and cooldown timer + m_needsUpdate = false; + m_whenUpdate = m_clock.now() + Tuning::bootcacheCooldownTime; +} + +// Checks the clock and calls update if we are off the cooldown. +void +Bootcache::checkUpdate () +{ + if (m_needsUpdate && m_whenUpdate < m_clock.now()) + update (); +} + +// Called when changes to an entry will affect the Store. +void +Bootcache::flagForUpdate () +{ + m_needsUpdate = true; + checkUpdate (); +} + +} +} diff --git a/src/ripple/peerfinder/impl/Bootcache.h b/src/ripple/peerfinder/impl/Bootcache.h index f7a97dc5d0..3e6df6988a 100644 --- a/src/ripple/peerfinder/impl/Bootcache.h +++ b/src/ripple/peerfinder/impl/Bootcache.h @@ -20,6 +20,10 @@ #ifndef RIPPLE_PEERFINDER_BOOTCACHE_H_INCLUDED #define RIPPLE_PEERFINDER_BOOTCACHE_H_INCLUDED +#include +#include +#include + namespace ripple { namespace PeerFinder { @@ -29,13 +33,7 @@ namespace PeerFinder { connections are needed. Along with the address, each entry has this additional metadata: - Uptime - - The number of seconds that the address has maintained an active - peer connection, cumulative, without a connection attempt failure. - Valence - A signed integer which represents the number of successful consecutive connection attempts when positive, and the number of failed consecutive connection attempts when negative. @@ -46,33 +44,18 @@ namespace PeerFinder { */ class Bootcache { -public: - /** An item used for connecting. */ - class Endpoint +private: + class Entry { public: - Endpoint () - : m_uptime (0) - , m_valence (0) + Entry (int valence) + : m_valence (valence) { } - Endpoint (IP::Endpoint const& address, - std::chrono::seconds uptime, int valence) - : m_address (address) - , m_uptime (uptime) - , m_valence (valence) + int& valence () { - } - - IP::Endpoint const& address () const - { - return m_address; - } - - std::chrono::seconds uptime () const - { - return m_uptime; + return m_valence; } int valence () const @@ -80,106 +63,40 @@ public: return m_valence; } - private: - IP::Endpoint m_address; - std::chrono::seconds m_uptime; - int m_valence; - }; - - typedef std::vector Endpoints; - - //-------------------------------------------------------------------------- - - /** An entry in the bootstrap cache. */ - struct Entry - { - Entry () - : cumulativeUptime (0) - , sessionUptime (0) - , connectionValence (0) - , active (false) + friend bool operator< (Entry const& lhs, Entry const& rhs) { - } - - /** Update the uptime measurement based on the time. */ - void update (clock_type::time_point const& now) - { - // Must be active! - assert (active); - // Clock must be monotonically increasing - assert (now >= whenActive); - // Remove the uptime we added earlier in the - // session and add back in the new uptime measurement. - auto const uptime (now - whenActive); - cumulativeUptime -= sessionUptime; - cumulativeUptime += uptime; - sessionUptime = uptime; - } - - /** Our cumulative uptime with this address with no failures. */ - std::chrono::seconds cumulativeUptime; - - /** Amount of uptime from the current session (if any). */ - std::chrono::seconds sessionUptime; - - /** Number of consecutive connection successes or failures. - If the number is positive, indicates the number of - consecutive successful connection attempts, else the - absolute value indicates the number of consecutive - connection failures. - */ - int connectionValence; - - /** `true` if the peer has handshaked and is currently connected. */ - bool active; - - /** Time when the peer became active. */ - clock_type::time_point whenActive; - }; - - //-------------------------------------------------------------------------- - - /* Comparison function for entries. - - 1. Sort descending by cumulative uptime - 2. For all uptimes == 0, - Sort descending by connection successes - 3. For all successes == 0 - Sort ascending by number of failures - */ - struct Less - { - template - bool operator() ( - Iter const& lhs_iter, Iter const& rhs_iter) - { - Entry const& lhs (lhs_iter->second); - Entry const& rhs (rhs_iter->second); - // Higher cumulative uptime always wins - if (lhs.cumulativeUptime > rhs.cumulativeUptime) - return true; - else if (lhs.cumulativeUptime <= rhs.cumulativeUptime - && rhs.cumulativeUptime.count() != 0) - return false; - // At this point both uptimes will be zero - consistency_check (lhs.cumulativeUptime.count() == 0 && - rhs.cumulativeUptime.count() == 0); - if (lhs.connectionValence > rhs.connectionValence) + if (lhs.valence() > rhs.valence()) return true; return false; } + + private: + int m_valence; }; - //-------------------------------------------------------------------------- + typedef boost::bimaps::unordered_set_of left_t; + typedef boost::bimaps::multiset_of right_t; + typedef boost::bimap map_type; + typedef map_type::value_type value_type; - typedef std::unordered_map Entries; + struct Transform : std::unary_function < + map_type::right_map::const_iterator::value_type const&, + IP::Endpoint const&> + { + IP::Endpoint const& operator() ( + map_type::right_map:: + const_iterator::value_type const& v) const + { + return v.get_left(); + } + }; - typedef std::vector SortedEntries; +private: + map_type m_map; Store& m_store; clock_type& m_clock; Journal m_journal; - Entries m_entries; // Time after which we can update the database again clock_type::time_point m_whenUpdate; @@ -187,367 +104,57 @@ public: // Set to true when a database update is needed bool m_needsUpdate; +public: + typedef boost::transform_iterator iterator; + + typedef iterator const_iterator; + Bootcache ( Store& store, clock_type& clock, - Journal journal) - : m_store (store) - , m_clock (clock) - , m_journal (journal) - , m_whenUpdate (m_clock.now ()) - { - } + Journal journal); - ~Bootcache () - { - update (); - } + ~Bootcache (); - //-------------------------------------------------------------------------- - - /** Load the persisted data from the Store into the container. */ - void load () - { - typedef std::vector StoredData; - StoredData const list (m_store.loadBootstrapCache ()); - - std::size_t count (0); - - for (StoredData::const_iterator iter (list.begin()); - iter != list.end(); ++iter) - { - std::pair result ( - m_entries.emplace (std::piecewise_construct, - std::forward_as_tuple (iter->address), - std::make_tuple ())); - if (result.second) - { - ++count; - Entry& entry (result.first->second); - entry.cumulativeUptime = iter->cumulativeUptime; - entry.connectionValence = iter->connectionValence; - } - else - { - if (m_journal.error) m_journal.error << leftw (18) << - "Bootcache discard " << iter->address; - } - } - - if (count > 0) - { - if (m_journal.info) m_journal.info << leftw (18) << - "Bootcache loaded " << count << - ((count > 1) ? " addresses" : " address"); - } - - prune (); - } + /** Returns `true` if the cache is empty. */ + bool empty() const; /** Returns the number of entries in the cache. */ - std::size_t size () const - { - return m_entries.size(); - } + map_type::size_type size() const; - /** Returns up to the specified number of the best addresses. */ - IPAddresses getAddresses (int n) - { - SortedEntries const list (sort()); - IPAddresses result; - int count (0); - result.reserve (n); - for (SortedEntries::const_iterator iter ( - list.begin()); ++count <= n && iter != list.end(); ++iter) - result.push_back ((*iter)->first); - consistency_check (result.size() <= n); - return result; - } + /** IP::Endpoint iterators that traverse in decreasing valence. */ + /** @{ */ + const_iterator begin() const; + const_iterator cbegin() const; + const_iterator end() const; + const_iterator cend() const; + void clear(); + /** @} */ - /** Returns all entries in the cache. */ - Endpoints fetch () const - { - Endpoints result; - result.reserve (m_entries.size ()); - for (Entries::const_iterator iter (m_entries.begin ()); - iter != m_entries.end (); ++iter) - result.emplace_back (iter->first, - iter->second.cumulativeUptime, - iter->second.connectionValence); - return result; - } + /** Load the persisted data from the Store into the container. */ + void load (); - /** Called periodically to perform time related tasks. */ - void periodicActivity () - { - checkUpdate(); - } - - /** Called when an address is learned from a message. */ - bool insert (IP::Endpoint const& address) - { - std::pair result ( - m_entries.emplace (std::piecewise_construct, - std::forward_as_tuple (address), - std::make_tuple ())); - if (result.second) - { - if (m_journal.trace) m_journal.trace << leftw (18) << - "Bootcache insert " << address; - prune (); - flagForUpdate(); - } - return result.second; - } - - /** Called when an outbound connection attempt fails to handshake. */ - void onConnectionFailure (IP::Endpoint const& address) - { - Entries::iterator iter (m_entries.find (address)); - // If the entry doesn't already exist don't bother remembering - // it since the connection failed. - // - if (iter == m_entries.end()) - return; - Entry& entry (iter->second); - // Reset cumulative uptime to zero. We are aggressive - // with resetting uptime to prevent the entire network - // from settling on just a handful of addresses. - // - entry.cumulativeUptime = std::chrono::seconds (0); - entry.sessionUptime = std::chrono::seconds (0); - // Increment the number of consecutive failures. - if (entry.connectionValence > 0) - entry.connectionValence = 0; - --entry.connectionValence; - int const count (std::abs (entry.connectionValence)); - if (m_journal.debug) m_journal.debug << leftw (18) << - "Bootcache failed " << address << - " with " << count << - ((count > 1) ? " attempts" : " attempt"); - flagForUpdate(); - } + /** Add the address to the cache. */ + bool insert (IP::Endpoint const& endpoint); /** Called when an outbound connection handshake completes. */ - void onConnectionHandshake (IP::Endpoint const& address, - HandshakeAction action) - { - std::pair result ( - m_entries.emplace (std::piecewise_construct, - std::forward_as_tuple (address), - std::make_tuple ())); - Entry& entry (result.first->second); - // Can't already be active! - consistency_check (! entry.active); - // Reset session uptime - entry.sessionUptime = std::chrono::seconds (0); - // Count this as a connection success - if (entry.connectionValence < 0) - entry.connectionValence = 0; - ++entry.connectionValence; - // Update active status - if (action == doActivate) - { - entry.active = true; - entry.whenActive = m_clock.now(); - } - else - { - entry.active = false; - } - // Prune if we made the container larger - if (result.second) - prune (); - flagForUpdate(); - if (m_journal.info) m_journal.info << leftw (18) << - "Bootcache connect " << address << - " with " << entry.connectionValence << - ((entry.connectionValence > 1) ? " successes" : " success"); - } + void on_success (IP::Endpoint const& endpoint); - /** Called periodically while the peer is active. */ - // - // VFALCO TODO Can't we just put the active ones into an intrusive list - // and update their uptime in periodicActivity() now that - // we have the m_clock member? - // - void onConnectionActive (IP::Endpoint const& address) - { - std::pair result ( - m_entries.emplace (std::piecewise_construct, - std::forward_as_tuple (address), - std::make_tuple ())); - // Must exist! - consistency_check (! result.second); - Entry& entry (result.first->second); - entry.update (m_clock.now()); - flagForUpdate(); - } + /** Called when an outbound connection attempt fails to handshake. */ + void on_failure (IP::Endpoint const& endpoint); - template - static std::string uptime_phrase ( - std::chrono::duration const& elapsed) - { - if (elapsed.count() > 0) - { - std::stringstream ss; - ss << " with " << elapsed << " uptime"; - return ss.str(); - } - return std::string (); - } - /** Called when an active outbound connection closes. */ - void onConnectionClosed (IP::Endpoint const& address) - { - Entries::iterator iter (m_entries.find (address)); - // Must exist! - consistency_check (iter != m_entries.end()); - Entry& entry (iter->second); - // Must be active! - consistency_check (entry.active); - if (m_journal.trace) m_journal.trace << leftw (18) << - "Bootcache close " << address << - uptime_phrase (entry.cumulativeUptime); - entry.update (m_clock.now()); - entry.sessionUptime = std::chrono::seconds (0); - entry.active = false; - flagForUpdate(); - } + /** Stores the cache in the persistent database on a timer. */ + void periodicActivity (); - //-------------------------------------------------------------------------- - // - // Diagnostics - // - //-------------------------------------------------------------------------- + /** Write the cache state to the property stream. */ + void onWrite (PropertyStream::Map& map); - void onWrite (PropertyStream::Map& map) - { - map ["entries"] = uint32(m_entries.size()); - } - - static std::string valenceString (int valence) - { - std::stringstream ss; - if (valence >= 0) - ss << '+'; - ss << valence; - return ss.str(); - } - - void dump (Journal::ScopedStream const& ss) const - { - std::vector const list (csort ()); - ss << std::endl << std::endl << - "Bootcache (size " << list.size() << ")"; - for (std::vector ::const_iterator iter ( - list.begin()); iter != list.end(); ++iter) - { - ss << std::endl << - (*iter)->first << ", " << - (*iter)->second.cumulativeUptime << ", " - << valenceString ((*iter)->second.connectionValence); - if ((*iter)->second.active) - ss << - ", active"; - } - } - - //-------------------------------------------------------------------------- private: - // Returns a vector of entry iterators sorted by descending score - std::vector csort () const - { - std::vector result; - result.reserve (m_entries.size()); - for (Entries::const_iterator iter (m_entries.begin()); - iter != m_entries.end(); ++iter) - result.push_back (iter); - std::random_shuffle (result.begin(), result.end()); - // should be std::unstable_sort (c++11) - std::sort (result.begin(), result.end(), Less()); - return result; - } - - // Returns a vector of entry iterators sorted by descending score - std::vector sort () - { - std::vector result; - result.reserve (m_entries.size()); - for (Entries::iterator iter (m_entries.begin()); - iter != m_entries.end(); ++iter) - result.push_back (iter); - std::random_shuffle (result.begin(), result.end()); - // should be std::unstable_sort (c++11) - std::sort (result.begin(), result.end(), Less()); - return result; - } - - // Checks the cache size and prunes if its over the limit. - void prune () - { - if (m_entries.size() <= Tuning::bootcacheSize) - return; - // Calculate the amount to remove - int count ((m_entries.size() * - Tuning::bootcachePrunePercent) / 100); - int pruned (0); - SortedEntries list (sort ()); - for (SortedEntries::const_reverse_iterator iter ( - list.rbegin()); count > 0 && iter != list.rend(); ++iter) - { - Entry& entry ((*iter)->second); - // skip active entries - if (entry.active) - continue; - if (m_journal.trace) m_journal.trace << leftw (18) << - "Bootcache pruned" << (*iter)->first << - uptime_phrase (entry.cumulativeUptime) << - " and valence " << entry.connectionValence; - m_entries.erase (*iter); - --count; - ++pruned; - } - - if (m_journal.debug) m_journal.debug << leftw (18) << - "Bootcache pruned " << pruned << " entries total"; - } - - // Updates the Store with the current set of entries if needed. - void update () - { - if (! m_needsUpdate) - return; - typedef std::vector StoredData; - StoredData list; - list.reserve (m_entries.size()); - for (Entries::const_iterator iter (m_entries.begin()); - iter != m_entries.end(); ++iter) - { - Store::SavedBootstrapAddress entry; - entry.address = iter->first; - entry.cumulativeUptime = iter->second.cumulativeUptime; - entry.connectionValence = iter->second.connectionValence; - list.push_back (entry); - } - m_store.updateBootstrapCache (list); - // Reset the flag and cooldown timer - m_needsUpdate = false; - m_whenUpdate = m_clock.now() + Tuning::bootcacheCooldownTime; - } - - // Checks the clock and calls update if we are off the cooldown. - void checkUpdate () - { - if (m_needsUpdate && m_whenUpdate < m_clock.now()) - update (); - } - - // Called when changes to an entry will affect the Store. - void flagForUpdate () - { - m_needsUpdate = true; - checkUpdate (); - } + void prune (); + void update (); + void checkUpdate (); + void flagForUpdate (); }; } diff --git a/src/ripple/peerfinder/impl/ConnectHandouts.cpp b/src/ripple/peerfinder/impl/ConnectHandouts.cpp new file mode 100644 index 0000000000..52b3bb1506 --- /dev/null +++ b/src/ripple/peerfinder/impl/ConnectHandouts.cpp @@ -0,0 +1,64 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "ConnectHandouts.h" + +namespace ripple { +namespace PeerFinder { + +ConnectHandouts::ConnectHandouts ( + std::size_t needed, Squelches& squelches) + : m_needed (needed) + , m_squelches (squelches) +{ + m_list.reserve (needed); +} + +bool +ConnectHandouts::try_insert (IP::Endpoint const& endpoint) +{ + if (full ()) + return false; + + // Make sure the address isn't already in our list + if (std::any_of (m_list.begin(), m_list.end(), + [&endpoint](IP::Endpoint const& other) + { + // Ignore port for security reasons + return other.address() == + endpoint.address(); + })) + { + return false; + } + + // Add to squelch list so we don't try it too often. + // If its already there, then make try_insert fail. + auto const result (m_squelches.insert ( + endpoint.address())); + if (! result.second) + return false; + + m_list.push_back (endpoint); + + return true; +} + +} +} diff --git a/src/ripple/peerfinder/impl/ConnectHandouts.h b/src/ripple/peerfinder/impl/ConnectHandouts.h new file mode 100644 index 0000000000..06d4c60125 --- /dev/null +++ b/src/ripple/peerfinder/impl/ConnectHandouts.h @@ -0,0 +1,79 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_PEERFINDER_CONNECTHANDOUTS_H_INCLUDED +#define RIPPLE_PEERFINDER_CONNECTHANDOUTS_H_INCLUDED + +#include "Tuning.h" + +#include "../../../beast/beast/container/aged_set.h" + +namespace ripple { +namespace PeerFinder { + +/** Receives handouts for making automatic connections. */ +class ConnectHandouts +{ +public: + // Keeps track of addresses we have made outgoing connections + // to, for the purposes of not connecting to them too frequently. + typedef beast::aged_set Squelches; + + typedef std::vector list_type; + +private: + std::size_t m_needed; + Squelches& m_squelches; + list_type m_list; + +public: + ConnectHandouts (std::size_t needed, Squelches& squelches); + + bool empty() const + { + return m_list.empty(); + } + + bool full() const + { + return m_list.size() >= m_needed; + } + + bool try_insert (Endpoint const& endpoint) + { + return try_insert (endpoint.address); + } + + list_type& list() + { + return m_list; + } + + list_type const& list() const + { + return m_list; + } + + bool try_insert (IP::Endpoint const& endpoint); +}; + +} +} + +#endif diff --git a/src/ripple/peerfinder/impl/Endpoint.cpp b/src/ripple/peerfinder/impl/Endpoint.cpp index 176fe697a6..8756919db6 100644 --- a/src/ripple/peerfinder/impl/Endpoint.cpp +++ b/src/ripple/peerfinder/impl/Endpoint.cpp @@ -27,5 +27,16 @@ Endpoint::Endpoint () { } +Endpoint::Endpoint (IP::Endpoint const& ep, int hops_) + : hops (hops_) + , address (ep) +{ +} + +bool operator< (Endpoint const& lhs, Endpoint const& rhs) +{ + return lhs.address < rhs.address; +} + } } diff --git a/src/ripple/peerfinder/impl/Giveaways.h b/src/ripple/peerfinder/impl/Giveaways.h deleted file mode 100644 index ee0d63d46c..0000000000 --- a/src/ripple/peerfinder/impl/Giveaways.h +++ /dev/null @@ -1,150 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2012, 2013 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#ifndef RIPPLE_PEERFINDER_GIVEAWAYS_H_INCLUDED -#define RIPPLE_PEERFINDER_GIVEAWAYS_H_INCLUDED - -namespace ripple { -namespace PeerFinder { - -/** Holds a rotating set of endpoint messages to give away. */ -class Giveaways -{ -public: - typedef std::vector Bucket; - typedef boost::array Buckets; - - Endpoints m_endpoints; - std::size_t m_remain; - Buckets m_buckets; - - void prepare () - { - for (Buckets::iterator iter (m_buckets.begin()); - iter != m_buckets.end(); ++iter) - iter->reserve (m_endpoints.size ()); - } - -public: - bool is_consistent () - { - // Make sure the counts add up - std::size_t count (0); - for (Buckets::const_iterator iter (m_buckets.begin()); - iter != m_buckets.end(); ++iter) - count += iter->size(); - return count == m_remain; - } - - void refill () - { - // Empty out the buckets - for (Buckets::iterator iter (m_buckets.begin()); - iter != m_buckets.end(); ++iter) - iter->clear(); - // Put endpoints back into buckets - for (Endpoints::const_iterator iter (m_endpoints.begin()); - iter != m_endpoints.end(); ++iter) - { - Endpoint const& ep (*iter); - consistency_check (ep.hops <= Tuning::maxHops); - m_buckets [ep.hops].push_back (&ep); - } - // Shuffle the buckets - for (Buckets::iterator iter (m_buckets.begin()); - iter != m_buckets.end(); ++iter) - std::random_shuffle (iter->begin(), iter->end()); - m_remain = m_endpoints.size(); - consistency_check (is_consistent ()); - } - -public: - explicit Giveaways (Endpoints const& endpoints) - : m_endpoints (endpoints) - , m_remain (0) - { - prepare(); - } - -#if BEAST_COMPILER_SUPPORTS_MOVE_SEMANTICS - Giveaways (Endpoints&& endpoints) - : m_endpoints (endpoints) - , m_remain (0) - { - prepare(); - } -#endif - - /** Append up to `n` Endpoint to the specified container. - The entries added to the container will have hops incremented. - */ - template - void append (Endpoints::size_type n, EndpointContainer& c) - { - n = std::min (n, m_endpoints.size()); - c.reserve (c.size () + n); - if (m_remain < n) - refill (); - for (cyclic_iterator iter ( - m_buckets.begin (), m_buckets.begin (), m_buckets.end()); n;) - { - Bucket& bucket (*iter++); - if (! bucket.empty ()) - { - c.emplace_back (*bucket.back ()); - bucket.pop_back (); - ++c.back ().hops; - --n; - --m_remain; - } - } - consistency_check (is_consistent ()); - } - - /** Retrieve a fresh set of endpoints, preferring high hops. - The entries added to the container will have hops incremented. - */ - template - void reverse_append (Endpoints::size_type n, EndpointContainer& c) - { - n = std::min (n, m_endpoints.size()); - c.reserve (c.size () + n); - if (m_remain < n) - refill (); - for (cyclic_iterator iter ( - m_buckets.rbegin (), m_buckets.rbegin (), m_buckets.rend()); n;) - { - Bucket& bucket (*iter++); - if (! bucket.empty ()) - { - c.emplace_back (*bucket.back ()); - bucket.pop_back (); - ++c.back ().hops; - --n; - --m_remain; - } - } - consistency_check (is_consistent ()); - } -}; - -} -} - -#endif diff --git a/src/ripple/peerfinder/impl/Livecache.cpp b/src/ripple/peerfinder/impl/Livecache.cpp index ff95d7cb6d..bea513cfa0 100644 --- a/src/ripple/peerfinder/impl/Livecache.cpp +++ b/src/ripple/peerfinder/impl/Livecache.cpp @@ -20,13 +20,24 @@ namespace ripple { namespace PeerFinder { +//------------------------------------------------------------------------------ + +namespace detail { + + + +} + +//------------------------------------------------------------------------------ + class LivecacheTests : public UnitTest { public: manual_clock m_clock; // Add the address as an endpoint - void add (uint32 index, uint16 port, Livecache& c) + template + void add (uint32 index, uint16 port, C& c) { Endpoint ep; ep.hops = 0; @@ -39,7 +50,7 @@ public: { beginTestCase ("fetch"); - Livecache c (m_clock, Journal()); + Livecache <> c (m_clock, Journal()); add (1, 1, c); add (2, 1, c); @@ -52,32 +63,7 @@ public: add (6, 2, c); add (7, 1, c); - Endpoints const eps (c.fetch_unique ()); - - struct IsAddr - { - explicit IsAddr (uint32 index_) - : index (index_) - { } - bool operator() (Endpoint const& ep) const - { return ep.address.to_v4().value == index; } - uint32 index; - }; - - expect (std::count_if ( - eps.begin(), eps.end(), IsAddr (1)) == 1); - expect (std::count_if ( - eps.begin(), eps.end(), IsAddr (2)) == 1); - expect (std::count_if ( - eps.begin(), eps.end(), IsAddr (3)) == 1); - expect (std::count_if ( - eps.begin(), eps.end(), IsAddr (4)) == 1); - expect (std::count_if ( - eps.begin(), eps.end(), IsAddr (5)) == 1); - expect (std::count_if ( - eps.begin(), eps.end(), IsAddr (6)) == 1); - expect (std::count_if ( - eps.begin(), eps.end(), IsAddr (7)) == 1); + // VFALCO TODO!!! pass(); } diff --git a/src/ripple/peerfinder/impl/Livecache.h b/src/ripple/peerfinder/impl/Livecache.h index ff8540aaef..3ecf299cfb 100644 --- a/src/ripple/peerfinder/impl/Livecache.h +++ b/src/ripple/peerfinder/impl/Livecache.h @@ -20,11 +20,151 @@ #ifndef RIPPLE_PEERFINDER_LIVECACHE_H_INCLUDED #define RIPPLE_PEERFINDER_LIVECACHE_H_INCLUDED -#include +#include "../../../beast/beast/container/aged_map.h" +#include "../../../beast/beast/type_traits/maybe_const.h" + +#include +#include namespace ripple { namespace PeerFinder { +template +class Livecache; + +namespace detail { + +class LivecacheBase +{ +protected: + struct Element + : boost::intrusive::list_base_hook <> + { + Element (Endpoint const& endpoint_) + : endpoint (endpoint_) + { + } + + Endpoint endpoint; + }; + + typedef boost::intrusive::make_list + >::type list_type; + +public: + /** A list of Endpoint at the same hops + This is a lightweight wrapper around a reference to the underlying + container. + */ + template + class Hop + { + public: + // Iterator transformation to extract the endpoint from Element + struct Transform + : public std::unary_function + { + Endpoint const& operator() (Element const& e) const + { + return e.endpoint; + } + }; + + public: + typedef boost::transform_iterator iterator; + + typedef iterator const_iterator; + + typedef boost::transform_iterator reverse_iterator; + + typedef reverse_iterator const_reverse_iterator; + + iterator begin () const + { + return iterator (m_list.get().cbegin(), + Transform()); + } + + iterator cbegin () const + { + return iterator (m_list.get().cbegin(), + Transform()); + } + + iterator end () const + { + return iterator (m_list.get().cend(), + Transform()); + } + + iterator cend () const + { + return iterator (m_list.get().cend(), + Transform()); + } + + reverse_iterator rbegin () const + { + return reverse_iterator (m_list.get().crbegin(), + Transform()); + } + + reverse_iterator crbegin () const + { + return reverse_iterator (m_list.get().crbegin(), + Transform()); + } + + reverse_iterator rend () const + { + return reverse_iterator (m_list.get().crend(), + Transform()); + } + + reverse_iterator crend () const + { + return reverse_iterator (m_list.get().crend(), + Transform()); + } + + // move the element to the end of the container + void move_back (const_iterator pos) + { + auto& e (const_cast (*pos.base())); + m_list.get().erase (m_list.get().iterator_to (e)); + m_list.get().push_back (e); + } + + private: + explicit Hop (typename maybe_const < + IsConst, list_type>::type& list) + : m_list (list) + { + } + + friend class LivecacheBase; + + std::reference_wrapper ::type> m_list; + }; + +protected: + // Work-around to call Hop's private constructor from Livecache + template + static Hop make_hop (typename maybe_const < + IsConst, list_type>::type& list) + { + return Hop (list); + } +}; + +} + +//------------------------------------------------------------------------------ + /** The Livecache holds the short-lived relayed Endpoint messages. Since peers only advertise themselves when they have open slots, @@ -37,234 +177,382 @@ namespace PeerFinder { launches or for bootstrapping, because they do not have verifiable and locally observed uptime and connectibility information. */ -class Livecache +template > +class Livecache : protected detail::LivecacheBase { -public: - struct Entry; +private: + typedef aged_map < + IP::Endpoint, + Element, + std::chrono::seconds, + std::less , + Allocator + > cache_type; - typedef List EntryList; - - struct Entry : public EntryList::Node - { - Entry (Endpoint const& endpoint_, - clock_type::time_point const& whenExpires_) - : endpoint (endpoint_) - , whenExpires (whenExpires_) - { - } - - Endpoint endpoint; - clock_type::time_point whenExpires; - }; - - typedef std::set SortedTable; - typedef std::unordered_map AddressTable; - - clock_type& m_clock; Journal m_journal; - AddressTable m_byAddress; - SortedTable m_bySorted; - - // Tracks all the cached endpoints stored in the endpoint table - // in oldest-to-newest order. The oldest item is at the head. - EntryList m_list; + cache_type m_cache; public: + typedef Allocator allocator_type; + /** Create the cache. */ Livecache ( clock_type& clock, - Journal journal) - : m_clock (clock) - , m_journal (journal) + Journal journal, + Allocator alloc = Allocator()); + + // + // Iteration by hops + // + // The range [begin, end) provides a sequence of list_type + // where each list contains endpoints at a given hops. + // + + class hops_t { - } + private: + // An endpoint at hops=0 represents the local node. + // Endpoints coming in at maxHops are stored at maxHops +1, + // but not given out (since they would exceed maxHops). They + // are used for automatic connection attempts. + // + typedef std::array Histogram; + typedef std::array lists_type; + + template + struct Transform + : public std::unary_function < + typename lists_type::value_type, Hop > + { + Hop operator() (typename maybe_const < + IsConst, typename lists_type::value_type>::type& list) const + { + return make_hop (list); + } + }; + + public: + typedef boost::transform_iterator , + typename lists_type::iterator> iterator; + + typedef boost::transform_iterator , + typename lists_type::const_iterator> const_iterator; + + typedef boost::transform_iterator , + typename lists_type::reverse_iterator> reverse_iterator; + + typedef boost::transform_iterator , + typename lists_type::const_reverse_iterator> const_reverse_iterator; + + iterator begin () + { + return iterator (m_lists.begin(), + Transform ()); + } + + const_iterator begin () const + { + return const_iterator (m_lists.cbegin(), + Transform ()); + } + + const_iterator cbegin () const + { + return const_iterator (m_lists.cbegin(), + Transform ()); + } + + iterator end () + { + return iterator (m_lists.end(), + Transform ()); + } + + const_iterator end () const + { + return const_iterator (m_lists.cend(), + Transform ()); + } + + const_iterator cend () const + { + return const_iterator (m_lists.cend(), + Transform ()); + } + + reverse_iterator rbegin () + { + return reverse_iterator (m_lists.rbegin(), + Transform ()); + } + + const_reverse_iterator rbegin () const + { + return const_reverse_iterator (m_lists.crbegin(), + Transform ()); + } + + const_reverse_iterator crbegin () const + { + return const_reverse_iterator (m_lists.crbegin(), + Transform ()); + } + + reverse_iterator rend () + { + return reverse_iterator (m_lists.rend(), + Transform ()); + } + + const_reverse_iterator rend () const + { + return const_reverse_iterator (m_lists.crend(), + Transform ()); + } + + const_reverse_iterator crend () const + { + return const_reverse_iterator (m_lists.crend(), + Transform ()); + } + + /** Shuffle each hop list. */ + void shuffle (); + + std::string histogram() const; + + private: + explicit hops_t (Allocator const& alloc); + + void insert (Element& e); + + // Reinsert e at a new hops + void reinsert (Element& e, int hops); + + void remove (Element& e); + + friend class Livecache; + lists_type m_lists; + Histogram m_hist; + } hops; /** Returns `true` if the cache is empty. */ bool empty () const { - return m_byAddress.empty (); + return m_cache.empty (); } /** Returns the number of entries in the cache. */ - AddressTable::size_type size() const + typename cache_type::size_type size() const { - return m_byAddress.size(); + return m_cache.size(); } /** Erase entries whose time has expired. */ - void sweep () - { - auto const now (m_clock.now ()); - AddressTable::size_type count (0); - for (EntryList::iterator iter (m_list.begin()); - iter != m_list.end();) - { - // Short circuit the loop since the list is sorted - if (iter->whenExpires > now) - break; - Entry& entry (*iter); - if (m_journal.trace) m_journal.trace << leftw (18) << - "Livecache expired " << entry.endpoint.address; - // Must erase from list before map - iter = m_list.erase (iter); - meets_postcondition (m_bySorted.erase ( - entry.endpoint) == 1); - meets_postcondition (m_byAddress.erase ( - entry.endpoint.address) == 1); - ++count; - } + void expire (); - if (count > 0) - { - if (m_journal.debug) m_journal.debug << leftw (18) << - "Livecache expired " << count << - ((count > 1) ? " entries" : " entry"); - } - } - - /** Creates or updates an existing entry based on a new message. */ - void insert (Endpoint endpoint) - { - // Caller is responsible for validation - check_precondition (endpoint.hops <= Tuning::maxHops); - auto now (m_clock.now ()); - auto const whenExpires (now + Tuning::liveCacheSecondsToLive); - std::pair result ( - m_byAddress.emplace (std::piecewise_construct, - std::make_tuple (endpoint.address), - std::make_tuple (endpoint, whenExpires))); - Entry& entry (result.first->second); - // Drop duplicates at higher hops - if (! result.second && (endpoint.hops > entry.endpoint.hops)) - { - std::size_t const excess ( - endpoint.hops - entry.endpoint.hops); - if (m_journal.trace) m_journal.trace << leftw(18) << - "Livecache drop " << endpoint.address << - " at hops +" << excess; - return; - } - // Update metadata if the address already exists - if (! result.second) - { - meets_postcondition (m_bySorted.erase ( - result.first->second.endpoint) == 1); - if (endpoint.hops < entry.endpoint.hops) - { - if (m_journal.debug) m_journal.debug << leftw (18) << - "Livecache update " << endpoint.address << - " at hops " << endpoint.hops; - entry.endpoint.hops = endpoint.hops; - } - else - { - if (m_journal.trace) m_journal.trace << leftw (18) << - "Livecache refresh " << endpoint.address << - " at hops " << endpoint.hops; - } - - entry.whenExpires = whenExpires; - - m_list.erase (m_list.iterator_to(entry)); - } - else - { - if (m_journal.debug) m_journal.debug << leftw (18) << - "Livecache insert " << endpoint.address << - " at hops " << endpoint.hops; - } - meets_postcondition (m_bySorted.insert (entry.endpoint).second); - m_list.push_back (entry); - } - - /** Returns the full set of endpoints in a Giveaways class. */ - Giveaways giveaways() - { - Endpoints endpoints; - endpoints.reserve (m_list.size()); - for (EntryList::const_iterator iter (m_list.cbegin()); - iter != m_list.cend(); ++iter) - { - endpoints.push_back (iter->endpoint); - endpoints.back ().hops; - } - if (! endpoints.empty()) - return Giveaways (endpoints); - return Giveaways (endpoints); - } - - /** Returns an ordered list all entries with unique addresses. */ - Endpoints fetch_unique () const - { - Endpoints result; - if (m_bySorted.empty ()) - return result; - result.reserve (m_bySorted.size ()); - Endpoint const& front (*m_bySorted.begin()); - IP::Address prev (front.address.address()); - result.emplace_back (front); - for (SortedTable::const_iterator iter (++m_bySorted.begin()); - iter != m_bySorted.end(); ++iter) - { - IP::Address const addr (iter->address.address()); - if (addr != prev) - { - result.emplace_back (*iter); - ++result.back().hops; - prev = addr; - } - } - return result; - } + /** Creates or updates an existing Element based on a new message. */ + void insert (Endpoint const& ep); /** Produce diagnostic output. */ - void dump (Journal::ScopedStream& ss) const - { - ss << std::endl << std::endl << - "Livecache (size " << m_byAddress.size() << ")"; - for (AddressTable::const_iterator iter (m_byAddress.begin()); - iter != m_byAddress.end(); ++iter) - { - Entry const& entry (iter->second); - ss << std::endl << - entry.endpoint.address << ", " << - entry.endpoint.hops << " hops"; - } - } - - /** Returns a histogram of message counts by hops. */ - typedef boost::array Histogram; - Histogram histogram () const - { - Histogram h; - for (Histogram::iterator iter (h.begin()); - iter != h.end(); ++iter) - *iter = 0; - for (EntryList::const_iterator iter (m_list.begin()); - iter != m_list.end(); ++iter) - ++h[iter->endpoint.hops]; - return h; - } + void dump (Journal::ScopedStream& ss) const; /** Output statistics. */ - void onWrite (PropertyStream::Map& map) - { - clock_type::time_point const now (m_clock.now ()); - map ["size"] = size (); - PropertyStream::Set set ("entries", map); - for (auto entry : m_byAddress) - { - PropertyStream::Map item (set); - Entry const& e (entry.second); - item ["hops"] = e.endpoint.hops; - item ["address"] = e.endpoint.address.to_string (); - std::stringstream ss; - ss << e.whenExpires - now; - item ["expires"] = ss.str(); - } - } + void onWrite (PropertyStream::Map& map); }; +//------------------------------------------------------------------------------ + +template +Livecache ::Livecache ( + clock_type& clock, + Journal journal, + Allocator alloc) + : m_journal (journal) + , m_cache (clock, alloc) + , hops (alloc) +{ +} + +template +void +Livecache ::expire() +{ + std::size_t n (0); + typename cache_type::time_point const expired ( + m_cache.clock().now() - Tuning::liveCacheSecondsToLive); + for (auto iter (m_cache.chronological.begin()); + iter != m_cache.chronological.end() && iter.when() <= expired;) + { + Element& e (iter->second); + hops.remove (e); + iter = m_cache.erase (iter); + ++n; + } + if (n > 0) + { + if (m_journal.debug) m_journal.debug << leftw (18) << + "Livecache expired " << n << + ((n > 1) ? " entries" : " entry"); + } +} + +template +void Livecache ::insert (Endpoint const& ep) +{ + // The caller already incremented hop, so if we got a + // message at maxHops we will store it at maxHops + 1. + // This means we won't give out the address to other peers + // but we will use it to make connections and hand it out + // when redirecting. + // + assert (ep.hops <= (Tuning::maxHops + 1)); + std::pair result ( + m_cache.emplace (ep.address, ep)); + Element& e (result.first->second); + if (result.second) + { + hops.insert (e); + if (m_journal.debug) m_journal.debug << leftw (18) << + "Livecache insert " << ep.address << + " at hops " << ep.hops; + return; + } + else if (! result.second && (ep.hops > e.endpoint.hops)) + { + // Drop duplicates at higher hops + std::size_t const excess ( + ep.hops - e.endpoint.hops); + if (m_journal.trace) m_journal.trace << leftw(18) << + "Livecache drop " << ep.address << + " at hops +" << excess; + return; + } + + m_cache.touch (result.first); + + // Address already in the cache so update metadata + if (ep.hops < e.endpoint.hops) + { + hops.reinsert (e, ep.hops); + if (m_journal.debug) m_journal.debug << leftw (18) << + "Livecache update " << ep.address << + " at hops " << ep.hops; + } + else + { + if (m_journal.trace) m_journal.trace << leftw (18) << + "Livecache refresh " << ep.address << + " at hops " << ep.hops; + } +} + +template +void +Livecache ::dump (Journal::ScopedStream& ss) const +{ + ss << std::endl << std::endl << + "Livecache (size " << m_cache.size() << ")"; + for (auto const& entry : m_cache) + { + auto const& e (entry.second); + ss << std::endl << + e.endpoint.address << ", " << + e.endpoint.hops << " hops"; + } +} + +template +void +Livecache ::onWrite (PropertyStream::Map& map) +{ + typename cache_type::time_point const expired ( + m_cache.clock().now() - Tuning::liveCacheSecondsToLive); + map ["size"] = size (); + map ["hist"] = hops.histogram(); + PropertyStream::Set set ("entries", map); + for (auto iter (m_cache.cbegin()); iter != m_cache.cend(); ++iter) + { + auto const& e (iter->second); + PropertyStream::Map item (set); + item ["hops"] = e.endpoint.hops; + item ["address"] = e.endpoint.address.to_string (); + std::stringstream ss; + ss << iter.when() - expired; + item ["expires"] = ss.str(); + } +} + +//------------------------------------------------------------------------------ + +template +void +Livecache ::hops_t::shuffle() +{ + for (auto& list : m_lists) + { + std::vector > v; + v.reserve (list.size()); + std::copy (list.begin(), list.end(), + std::back_inserter (v)); + std::random_shuffle (v.begin(), v.end()); + list.clear(); + for (auto& e : v) + list.push_back (e); + } +} + +template +std::string +Livecache ::hops_t::histogram() const +{ + std::stringstream ss; + for (auto i : m_hist) + ss << + i << + ((i < Tuning::maxHops + 1) ? ", " : ""); + return ss.str(); +} + +template +Livecache ::hops_t::hops_t (Allocator const& alloc) +{ + std::fill (m_hist.begin(), m_hist.end(), 0); +} + +template +void +Livecache ::hops_t::insert (Element& e) +{ + assert (e.endpoint.hops >= 0 && + e.endpoint.hops <= Tuning::maxHops + 1); + // This has security implications without a shuffle + m_lists [e.endpoint.hops].push_front (e); + ++m_hist [e.endpoint.hops]; +} + +template +void +Livecache ::hops_t::reinsert (Element& e, int hops) +{ + assert (hops >= 0 && hops <= Tuning::maxHops + 1); + list_type& list (m_lists [e.endpoint.hops]); + list.erase (list.iterator_to (e)); + --m_hist [e.endpoint.hops]; + + e.endpoint.hops = hops; + insert (e); +} + +template +void +Livecache ::hops_t::remove (Element& e) +{ + --m_hist [e.endpoint.hops]; + list_type& list (m_lists [e.endpoint.hops]); + list.erase (list.iterator_to (e)); +} + } } diff --git a/src/ripple/peerfinder/impl/Logic.h b/src/ripple/peerfinder/impl/Logic.h index 8294fc5976..1c0a1a0052 100644 --- a/src/ripple/peerfinder/impl/Logic.h +++ b/src/ripple/peerfinder/impl/Logic.h @@ -23,6 +23,14 @@ #include "Fixed.h" #include "SlotImp.h" +#include "handout.h" +#include "ConnectHandouts.h" +#include "RedirectHandouts.h" +#include "SlotHandouts.h" + +#include "../../../beast/beast/container/aged_container_utility.h" + +#include #include namespace ripple { @@ -82,7 +90,7 @@ public: FixedSlots fixed; // Live livecache from mtENDPOINTS messages - Livecache livecache; + Livecache <> livecache; // LiveCache of addresses suitable for gaining initial connections Bootcache bootcache; @@ -111,6 +119,10 @@ public: // A list of dynamic sources to consult as a fallback std::vector > m_sources; + clock_type::time_point m_whenBroadcast; + + ConnectHandouts::Squelches m_squelches; + //-------------------------------------------------------------------------- Logic ( @@ -125,6 +137,8 @@ public: , m_callback (callback) , m_store (store) , m_checker (checker) + , m_whenBroadcast (m_clock.now()) + , m_squelches (m_clock) { setConfig (Config ()); } @@ -177,7 +191,7 @@ public: return; } - for (auto remote_address : addresses) + for (auto const& remote_address : addresses) { auto result (state->fixed.emplace (std::piecewise_construct, std::forward_as_tuple (remote_address), @@ -195,6 +209,62 @@ public: //-------------------------------------------------------------------------- + // Called when the Checker completes a connectivity test + void checkComplete (IP::Endpoint const& address, + IP::Endpoint const & checkedAddress, Checker::Result const& result) + { + if (result.error == boost::asio::error::operation_aborted) + return; + + SharedState::Access state (m_state); + Slots::iterator const iter (state->slots.find (address)); + SlotImp& slot (*iter->second); + + if (iter == state->slots.end()) + { + // The slot disconnected before we finished the check + if (m_journal.debug) m_journal.debug << leftw (18) << + "Logic tested " << address << + " but the connection was closed"; + return; + } + + // Mark that a check for this slot is finished. + slot.connectivityCheckInProgress = false; + + if (! result.error) + { + slot.checked = true; + slot.canAccept = result.canAccept; + + if (slot.canAccept) + { + if (m_journal.debug) m_journal.debug << leftw (18) << + "Logic testing " << address << " succeeded"; + } + else + { + if (m_journal.info) m_journal.info << leftw (18) << + "Logic testing " << address << " failed"; + } + } + else + { + // VFALCO TODO Should we retry depending on the error? + slot.checked = true; + slot.canAccept = false; + + if (m_journal.error) m_journal.error << leftw (18) << + "Logic testing " << iter->first << " with error, " << + result.error.message(); + } + + if (! slot.canAccept) + state->bootcache.on_failure (address); + } + + //-------------------------------------------------------------------------- + SlotImp::ptr new_inbound_slot (IP::Endpoint const& local_endpoint, IP::Endpoint const& remote_endpoint) { @@ -220,7 +290,8 @@ public: // Create the slot SlotImp::ptr const slot (std::make_shared (local_endpoint, - remote_endpoint, fixed (remote_endpoint.address (), state))); + remote_endpoint, fixed (remote_endpoint.address (), state), + m_clock)); // Add slot to table auto const result (state->slots.emplace ( slot->remote_endpoint (), slot)); @@ -254,7 +325,7 @@ public: // Create the slot SlotImp::ptr const slot (std::make_shared ( - remote_endpoint, fixed (remote_endpoint, state))); + remote_endpoint, fixed (remote_endpoint, state), m_clock)); // Add slot to table std::pair result ( @@ -353,8 +424,8 @@ public: state->counts.add (*slot); if (! slot->inbound()) - state->bootcache.onConnectionHandshake ( - slot->remote_endpoint(), doActivate); + state->bootcache.on_success ( + slot->remote_endpoint()); // Mark fixed slot success if (slot->fixed() && ! slot->inbound()) @@ -371,30 +442,88 @@ public: else { if (! slot->inbound()) - state->bootcache.onConnectionHandshake ( - slot->remote_endpoint(), doClose); + state->bootcache.on_success ( + slot->remote_endpoint()); + // Maybe give them some addresses to try if (slot->inbound ()) + redirect (slot, state); + + m_callback.disconnect (slot, true); + } + } + + //-------------------------------------------------------------------------- + + // Validate and clean up the list that we received from the slot. + void preprocess (SlotImp::ptr const& slot, Endpoints& list, + SharedState::Access& state) + { + bool neighbor (false); + for (auto iter (list.begin()); iter != list.end();) + { + Endpoint& ep (*iter); + + // Enforce hop limit + if (ep.hops > Tuning::maxHops) { - // We are full, so send the inbound connection some - // new addresses to try then gracefully close them. - Endpoints const endpoints (getSomeEndpoints ()); - if (! endpoints.empty ()) + if (m_journal.warning) m_journal.warning << leftw (18) << + "Endpoints drop " << ep.address << + " for excess hops " << ep.hops; + iter = list.erase (iter); + continue; + } + + // See if we are directly connected + if (ep.hops == 0) + { + if (! neighbor) { - if (m_journal.trace) m_journal.trace << leftw (18) << - "Logic redirect " << slot->remote_endpoint() << - " with " << endpoints.size() << - ((endpoints.size() > 1) ? " addresses" : " address"); - m_callback.send (slot, endpoints); + // Fill in our neighbors remote address + neighbor = true; + ep.address = slot->remote_endpoint().at_port ( + ep.address.port ()); } else { if (m_journal.warning) m_journal.warning << leftw (18) << - "Logic deferred " << slot->remote_endpoint(); + "Endpoints drop " << ep.address << + " for extra self"; + iter = list.erase (iter); + continue; } } - m_callback.disconnect (slot, true); + // Discard invalid addresses + if (! is_valid_address (ep.address)) + { + if (m_journal.warning) m_journal.warning << leftw (18) << + "Endpoints drop " << ep.address << + " as invalid"; + iter = list.erase (iter); + continue; + } + + // Filter duplicates + if (std::any_of (list.begin(), iter, + [ep](Endpoints::value_type const& other) + { + return ep.address == other.address; + })) + { + if (m_journal.warning) m_journal.warning << leftw (18) << + "Endpoints drop " << ep.address << + " as duplicate"; + iter = list.erase (iter); + continue; + } + + // Increment hop count on the incoming message, so + // we store it at the hop count we will send it at. + // + ++ep.hops; + + ++iter; } } @@ -404,74 +533,41 @@ public: "Endpoints from " << slot->remote_endpoint () << " contained " << list.size () << ((list.size() > 1) ? " entries" : " entry"); + SharedState::Access state (m_state); + // The object must exist in our table assert (state->slots.find (slot->remote_endpoint ()) != state->slots.end ()); + // Must be handshaked! assert (slot->state() == Slot::active); - // Preprocess the endpoints - { - bool neighbor (false); - for (Endpoints::iterator iter (list.begin()); - iter != list.end();) - { - Endpoint& ep (*iter); - if (ep.hops > Tuning::maxHops) - { - if (m_journal.warning) m_journal.warning << leftw (18) << - "Endpoints drop " << ep.address << - " for excess hops " << ep.hops; - iter = list.erase (iter); - continue; - } - if (ep.hops == 0) - { - if (! neighbor) - { - // Fill in our neighbors remote address - neighbor = true; - ep.address = slot->remote_endpoint().at_port ( - ep.address.port ()); - } - else - { - if (m_journal.warning) m_journal.warning << leftw (18) << - "Endpoints drop " << ep.address << - " for extra self"; - iter = list.erase (iter); - continue; - } - } - if (! is_valid_address (ep.address)) - { - if (m_journal.warning) m_journal.warning << leftw (18) << - "Endpoints drop " << ep.address << - " as invalid"; - iter = list.erase (iter); - continue; - } - ++iter; - } - } + + preprocess (slot, list, state); clock_type::time_point const now (m_clock.now()); - for (Endpoints::const_iterator iter (list.begin()); - iter != list.end(); ++iter) + for (auto iter (list.cbegin()); iter != list.cend(); ++iter) { Endpoint const& ep (*iter); - //slot->received.insert (ep.address); + assert (ep.hops != 0); - if (ep.hops == 0) + slot->recent.insert (ep.address, ep.hops); + + // Note hops has been incremented, so 1 + // means a directly connected neighbor. + // + if (ep.hops == 1) { if (slot->connectivityCheckInProgress) { if (m_journal.warning) m_journal.warning << leftw (18) << "Logic testing " << ep.address << " already in progress"; + continue; } - else if (! slot->checked) + + if (! slot->checked) { // Mark that a check for this slot is now in progress. slot->connectivityCheckInProgress = true; @@ -487,27 +583,28 @@ public: // that the neighbor sends when we perform the // listening test. They will just send us another // one in a few seconds. + + continue; } - else if (slot->canAccept) - { - // We only add to the livecache if the neighbor passed the - // listening test, else we silently drop their messsage - // since their listening port is misconfigured. - // - state->livecache.insert (ep); - state->bootcache.insert (ep.address); - } - } - else - { - state->livecache.insert (ep); - state->bootcache.insert (ep.address); + + // If they failed the test then skip the address + if (! slot->canAccept) + continue; } + + // We only add to the livecache if the neighbor passed the + // listening test, else we silently drop their messsage + // since their listening port is misconfigured. + // + state->livecache.insert (ep); + state->bootcache.insert (ep.address); } slot->whenAcceptEndpoints = now + Tuning::secondsPerMessage; } + //-------------------------------------------------------------------------- + void on_legacy_endpoints (IPAddresses const& list) { // Ignoring them also seems a valid choice. @@ -566,7 +663,7 @@ public: case Slot::connect: case Slot::connected: - state->bootcache.onConnectionFailure (slot->remote_endpoint ()); + state->bootcache.on_failure (slot->remote_endpoint ()); // VFALCO TODO If the address exists in the ephemeral/live // endpoint livecache then we should mark the failure // as if it didn't pass the listening test. We should also @@ -574,15 +671,13 @@ public: break; case Slot::active: - if (! slot->inbound ()) - state->bootcache.onConnectionClosed (slot->remote_endpoint ()); if (m_journal.trace) m_journal.trace << leftw (18) << - "Logic closed active " << slot->remote_endpoint(); + "Logic close " << slot->remote_endpoint(); break; case Slot::closing: if (m_journal.trace) m_journal.trace << leftw (18) << - "Logic closed " << slot->remote_endpoint(); + "Logic finished " << slot->remote_endpoint(); break; default: @@ -596,7 +691,7 @@ public: // Returns `true` if the address matches a fixed slot address bool fixed (IP::Endpoint const& endpoint, SharedState::Access& state) const { - for (auto entry : state->fixed) + for (auto const& entry : state->fixed) if (entry.first == endpoint) return true; return false; @@ -606,76 +701,12 @@ public: // Note that this does not use the port information in the IP::Endpoint bool fixed (IP::Address const& address, SharedState::Access& state) const { - for (auto entry : state->fixed) + for (auto const& entry : state->fixed) if (entry.first.address () == address) return true; return false; } - //-------------------------------------------------------------------------- - - /** Returns a new set of connection addresses from the live cache. */ - IPAddresses fetch_livecache (std::size_t needed, SharedState::Access& state) - { - Endpoints endpoints (state->livecache.fetch_unique()); - Endpoints temp; - temp.reserve (endpoints.size ()); - - { - // Remove the addresses we are currently connected to - struct LessWithoutPortSet - { - bool operator() ( - Endpoint const& lhs, IP::Endpoint const& rhs) const - { - return lhs.address.address() < rhs.address(); - } - bool operator() ( - Endpoint const& lhs, Endpoint const& rhs) const - { - return lhs.address.address() < rhs.address.address(); - } - bool operator() ( - IP::Endpoint const& lhs, Endpoint const& rhs) const - { - return lhs.address() < rhs.address.address(); - } - bool operator() ( - IP::Endpoint const& lhs, IP::Endpoint const& rhs) const - { - return lhs.address() < rhs.address(); - } - }; - std::set_difference (endpoints.begin (), endpoints.end (), - state->connected_addresses.begin (), state->connected_addresses.end (), - std::back_inserter (temp), LessWithoutPortSet ()); - std::swap (endpoints, temp); - temp.clear (); - } - - { - // Sort by hops descending - struct LessHops - { - bool operator() (Endpoint const& lhs, Endpoint const& rhs) const - { - return lhs.hops > rhs.hops; - } - }; - std::sort (endpoints.begin (), endpoints.end (), LessHops ()); - } - - if (endpoints.size () > needed) - endpoints.resize (needed); - - IPAddresses result; - result.reserve (endpoints.size ()); - for (Endpoints::const_iterator iter (endpoints.begin ()); - iter != endpoints.end (); ++iter) - result.push_back (iter->address); - return result; - } - //-------------------------------------------------------------------------- // // Connection Strategy @@ -705,184 +736,24 @@ public: } } - /** Adds eligible bootcache addresses for outbound attempts. */ - template - void get_bootcache (std::size_t needed, Container& c, SharedState::Access& state) - { - // Get everything - auto endpoints (state->bootcache.fetch ()); - - struct LessRank - { - bool operator() (Bootcache::Endpoint const& lhs, - Bootcache::Endpoint const& rhs) const - { - if (lhs.uptime() > rhs.uptime()) - return true; - else if (lhs.uptime() <= rhs.uptime() && rhs.uptime().count() != 0) - return false; - if (lhs.valence() > rhs.valence()) - return true; - return false; - } - }; - - { - // Sort ignoring port - struct LessWithoutPort - { - bool operator() (Bootcache::Endpoint const& lhs, - Bootcache::Endpoint const& rhs) const - { - if (lhs.address().at_port (0) < rhs.address().at_port (0)) - return true; - // Break ties by preferring higher ranks - //return m_rank (lhs, rhs); - return false; - } - - LessRank m_rank; - }; - std::sort (endpoints.begin (), endpoints.end (), LessWithoutPort()); - } - - Bootcache::Endpoints temp; - temp.reserve (endpoints.size ()); - - { - // Remove all but the first unique addresses ignoring port - struct EqualWithoutPort - { - bool operator() (Bootcache::Endpoint const& lhs, - Bootcache::Endpoint const& rhs) const - { - return lhs.address().at_port (0) == - rhs.address().at_port (0); - } - }; - - std::unique_copy (endpoints.begin (), endpoints.end (), - std::back_inserter (temp), EqualWithoutPort ()); - std::swap (endpoints, temp); - temp.clear (); - } - - { - // Remove the addresses we are currently connected to - struct LessWithoutPortSet - { - bool operator() (Bootcache::Endpoint const& lhs, - IP::Endpoint const& rhs) const - { - return lhs.address().at_port (0) < rhs.at_port (0); - } - bool operator() (Bootcache::Endpoint const& lhs, - Bootcache::Endpoint const& rhs) const - { - return lhs.address().at_port (0) < - rhs.address().at_port (0); - } - bool operator() (IP::Endpoint const& lhs, - Bootcache::Endpoint const& rhs) const - { - return lhs.at_port (0) < rhs.address().at_port (0); - } - bool operator() (IP::Endpoint const& lhs, - IP::Endpoint const& rhs) const - { - return lhs.at_port (0) < rhs.at_port (0); - } - }; - std::set_difference (endpoints.begin (), endpoints.end (), - state->connected_addresses.begin (), state->connected_addresses.end (), - std::back_inserter (temp), LessWithoutPortSet ()); - std::swap (endpoints, temp); - temp.clear (); - } - - { - // Sort by rank descending - std::sort (endpoints.begin (), endpoints.end (), LessRank ()); - } - - if (endpoints.size () > needed) - endpoints.resize (needed); - - c.reserve (endpoints.size ()); - for (Bootcache::Endpoints::const_iterator iter (endpoints.begin ()); - iter != endpoints.end (); ++iter) - c.emplace_back (iter->address()); - } - - /** Returns a new set of connection addresses from the live cache. */ - template - void get_livecache (std::size_t needed, Container& c, - SharedState::Access& state) - { - Endpoints endpoints (state->livecache.fetch_unique()); - Endpoints temp; - temp.reserve (endpoints.size ()); - - { - // Remove the addresses we are currently connected to - struct LessWithoutPortSet - { - bool operator() ( - Endpoint const& lhs, IP::Endpoint const& rhs) const - { - return lhs.address.address() < rhs.address(); - } - bool operator() ( - Endpoint const& lhs, Endpoint const& rhs) const - { - return lhs.address.address() < rhs.address.address(); - } - bool operator() ( - IP::Endpoint const& lhs, Endpoint const& rhs) const - { - return lhs.address() < rhs.address.address(); - } - bool operator() ( - IP::Endpoint const& lhs, IP::Endpoint const& rhs) const - { - return lhs.address() < rhs.address(); - } - }; - std::set_difference (endpoints.begin (), endpoints.end (), - state->connected_addresses.begin (), state->connected_addresses.end (), - std::back_inserter (temp), LessWithoutPortSet ()); - std::swap (endpoints, temp); - temp.clear (); - } - - { - // Sort by hops descending - struct LessHops - { - bool operator() (Endpoint const& lhs, Endpoint const& rhs) const - { - return lhs.hops > rhs.hops; - } - }; - std::sort (endpoints.begin (), endpoints.end (), LessHops ()); - } - - if (endpoints.size () > needed) - endpoints.resize (needed); - - IPAddresses result; - result.reserve (endpoints.size ()); - for (Endpoints::const_iterator iter (endpoints.begin ()); - iter != endpoints.end (); ++iter) - c.push_back (iter->address); - } - //-------------------------------------------------------------------------- + // Adds slot addresses to the squelched set + void squelch_slots (SharedState::Access& state) + { + for (auto const& s : state->slots) + { + auto const result (m_squelches.insert ( + s.second->remote_endpoint().address())); + if (! result.second) + m_squelches.touch (result.first); + } + } + /** Create new outbound connection attempts as needed. This implements PeerFinder's "Outbound Connection Strategy" */ - void makeOutgoingConnections () + void autoconnect () { SharedState::Access state (m_state); @@ -891,8 +762,11 @@ public: auto needed (state->counts.attempts_needed ()); if (needed == 0) return; - std::vector list; - list.reserve (needed); + + ConnectHandouts h (needed, m_squelches); + + // Make sure we don't connect to already-connected entries. + squelch_slots (state); // 1. Use Fixed if: // Fixed active count is below fixed count AND @@ -901,16 +775,16 @@ public: // if (state->counts.fixed_active() < state->fixed.size ()) { - get_fixed (needed, list, state); + get_fixed (needed, h.list(), state); - if (! list.empty ()) + if (! h.list().empty ()) { if (m_journal.debug) m_journal.debug << leftw (18) << - "Logic connect " << list.size() << " fixed"; - m_callback.connect (list); + "Logic connect " << h.list().size() << " fixed"; + m_callback.connect (h.list()); return; } - + if (state->counts.attempts() > 0) { if (m_journal.debug) m_journal.debug << leftw (18) << @@ -931,21 +805,26 @@ public: // There are any entries in the cache OR // Any outbound attempts are in progress // - get_livecache (needed, list, state); - if (! list.empty ()) { - if (m_journal.debug) m_journal.debug << leftw (18) << - "Logic connect " << list.size () << " live " << - ((list.size () > 1) ? "endpoints" : "endpoint"); - m_callback.connect (list); - return; - } - else if (state->counts.attempts() > 0) - { - if (m_journal.debug) m_journal.debug << leftw (18) << - "Logic waiting on " << - state->counts.attempts() << " attempts"; - return; + state->livecache.hops.shuffle(); + handout (&h, (&h)+1, + state->livecache.hops.rbegin(), + state->livecache.hops.rend()); + if (! h.list().empty ()) + { + if (m_journal.debug) m_journal.debug << leftw (18) << + "Logic connect " << h.list().size () << " live " << + ((h.list().size () > 1) ? "endpoints" : "endpoint"); + m_callback.connect (h.list()); + return; + } + else if (state->counts.attempts() > 0) + { + if (m_journal.debug) m_journal.debug << leftw (18) << + "Logic waiting on " << + state->counts.attempts() << " attempts"; + return; + } } /* 3. Bootcache refill @@ -964,13 +843,16 @@ public: // 4. Use Bootcache if: // There are any entries we haven't tried lately // - get_bootcache (needed, list, state); - if (! list.empty ()) + for (auto iter (state->bootcache.begin()); + ! h.full() && iter != state->bootcache.end(); ++iter) + h.try_insert (*iter); + + if (! h.list().empty ()) { if (m_journal.debug) m_journal.debug << leftw (18) << - "Logic connect " << list.size () << " boot " << - ((list.size () > 1) ? "addresses" : "address"); - m_callback.connect (list); + "Logic connect " << h.list().size () << " boot " << + ((h.list().size () > 1) ? "addresses" : "address"); + m_callback.connect (h.list()); return; } @@ -992,35 +874,38 @@ public: //-------------------------------------------------------------------------- // Called periodically to sweep the livecache and remove aged out items. - void sweepCache () + void expire () { SharedState::Access state (m_state); - state->livecache.sweep (); - for (auto iter : state->slots) - { - //Slot& slot (*iter->second); - //slot.received.cycle(); - } - } - // Called periodically to update uptime for connected outbound peers. - void processUptime (SharedState::Access& state) - { - for (auto entry : state->slots) - { - Slot const& slot (*entry.second); - if (! slot.inbound() && slot.state() == Slot::active) - state->bootcache.onConnectionActive ( - slot.remote_endpoint()); - } + // Expire the Livecache + state->livecache.expire (); + + // Expire the recent cache in each slot + for (auto const& entry : state->slots) + entry.second->expire(); + + // Expire the recent attempts table + beast::expire (m_squelches, + Tuning::recentAttemptDuration); } // Called every so often to perform periodic tasks. void periodicActivity () { SharedState::Access state (m_state); - processUptime (state); + + clock_type::time_point const now (m_clock.now()); + + autoconnect (); + expire (); state->bootcache.periodicActivity (); + + if (m_whenBroadcast <= now) + { + broadcast (); + m_whenBroadcast = now + Tuning::secondsPerMessage; + } } //-------------------------------------------------------------------------- @@ -1092,7 +977,6 @@ public: "Logic failed " << "'" << source->name() << "' fetch, " << results.error.message(); } - } //-------------------------------------------------------------------------- @@ -1125,49 +1009,29 @@ public: return true; } - // Creates a set of endpoints suitable for a temporary slot. - // Sent to a slot when we are full, before disconnecting them. - // - Endpoints getSomeEndpoints () + //-------------------------------------------------------------------------- + + // Gives a slot a set of addresses to try next since we're full + void redirect (SlotImp::ptr const& slot, SharedState::Access& state) { - SharedState::Access state (m_state); - Endpoints result (state->livecache.fetch_unique ()); - std::random_shuffle (result.begin(), result.end()); - if (result.size () > Tuning::redirectEndpointCount) - result.resize (Tuning::redirectEndpointCount); - return result; - } + RedirectHandouts h (slot); + state->livecache.hops.shuffle(); + handout (&h, (&h)+1, + state->livecache.hops.begin(), + state->livecache.hops.end()); - // Send mtENDPOINTS for the specified slot - void sendEndpointsTo (Slot::ptr const& slot, Giveaways& g) - { - Endpoints endpoints; - - if (endpoints.size() < Tuning::numberOfEndpoints) - { - SharedState::Access state (m_state); - - // Add an entry for ourselves if: - // 1. We want incoming - // 2. We have counts - // 3. We haven't failed the firewalled test - // - if (state->config.wantIncoming && state->counts.inboundSlots() > 0) - endpoints.push_back (thisEndpoint (state)); - } - - if (endpoints.size() < Tuning::numberOfEndpoints) - { - g.append (Tuning::numberOfEndpoints - endpoints.size(), endpoints); - } - - if (! endpoints.empty()) + if (! h.list().empty ()) { if (m_journal.trace) m_journal.trace << leftw (18) << - "Logic sending " << slot->remote_endpoint() << - " with " << endpoints.size() << - ((endpoints.size() > 1) ? " endpoints" : " endpoint"); - m_callback.send (slot, endpoints); + "Logic redirect " << slot->remote_endpoint() << + " with " << h.list().size() << + ((h.list().size() == 1) ? " address" : " addresses"); + m_callback.send (slot, h.list()); + } + else + { + if (m_journal.warning) m_journal.warning << leftw (18) << + "Logic deferred " << slot->remote_endpoint(); } } @@ -1175,157 +1039,49 @@ public: void broadcast () { SharedState::Access state (m_state); - if (! state->slots.empty()) + + std::vector targets; + { - clock_type::time_point const now (m_clock.now()); - clock_type::time_point const whenSendEndpoints ( - now + Tuning::secondsPerMessage); - Giveaways g (state->livecache.giveaways ()); - for (auto entry : state->slots) - { - auto& slot (entry.second); - if (slot->state() == Slot::active) + // build list of active slots + std::vector slots; + slots.reserve (state->slots.size()); + std::for_each (state->slots.cbegin(), state->slots.cend(), + [&slots](Slots::value_type const& value) { - if (slot->whenSendEndpoints <= now) - { - sendEndpointsTo (slot, g); - slot->whenSendEndpoints = whenSendEndpoints; - } - } - } + if (value.second->state() == Slot::active) + slots.emplace_back (value.second); + }); + std::random_shuffle (slots.begin(), slots.end()); + + // build target vector + targets.reserve (slots.size()); + std::for_each (slots.cbegin(), slots.cend(), + [&targets](SlotImp::ptr const& slot) + { + targets.emplace_back (slot); + }); + } + + // build sequence of endpoints by hops + state->livecache.hops.shuffle(); + handout (targets.begin(), targets.end(), + state->livecache.hops.begin(), + state->livecache.hops.end()); + + // broadcast + for (auto const& t : targets) + { + SlotImp::ptr const& slot (t.slot()); + auto const& list (t.list()); + if (m_journal.trace) m_journal.trace << leftw (18) << + "Logic sending " << slot->remote_endpoint() << + " with " << list.size() << + ((list.size() == 1) ? " endpoint" : " endpoints"); + m_callback.send (slot, list); } } - // Called when the Checker completes a connectivity test - void checkComplete (IP::Endpoint const& address, - IP::Endpoint const & checkedAddress, Checker::Result const& result) - { - if (result.error == boost::asio::error::operation_aborted) - return; - - SharedState::Access state (m_state); - Slots::iterator const iter (state->slots.find (address)); - SlotImp& slot (*iter->second); - - if (iter == state->slots.end()) - { - // The slot disconnected before we finished the check - if (m_journal.debug) m_journal.debug << leftw (18) << - "Logic tested " << address << - " but the connection was closed"; - return; - } - - // Mark that a check for this slot is finished. - slot.connectivityCheckInProgress = false; - - if (! result.error) - { - slot.checked = true; - slot.canAccept = result.canAccept; - - if (slot.canAccept) - { - if (m_journal.debug) m_journal.debug << leftw (18) << - "Logic testing " << address << " succeeded"; - } - else - { - if (m_journal.info) m_journal.info << leftw (18) << - "Logic testing " << address << " failed"; - } - } - else - { - // VFALCO TODO Should we retry depending on the error? - slot.checked = true; - slot.canAccept = false; - - if (m_journal.error) m_journal.error << leftw (18) << - "Logic testing " << iter->first << " with error, " << - result.error.message(); - } - - if (slot.canAccept) - { - // VFALCO TODO Why did I think this line was needed? - //state->bootcache.onConnectionHandshake (address); - } - else - { - state->bootcache.onConnectionFailure (address); - } - } - - //-------------------------------------------------------------------------- - // - // Socket Hooks - // - //-------------------------------------------------------------------------- - - // Returns `true` if the address matches the remote address of one - // of our outbound sockets. - // - // VFALCO TODO Do the lookup using an additional index by local address - bool haveLocalOutboundAddress (IP::Endpoint const& local_address, - SharedState::Access& state) - { - for (Slots::const_iterator iter (state->slots.begin()); - iter != state->slots.end(); ++iter) - { - Slot const& slot (*iter->second); - if (! slot.inbound () && - slot.local_endpoint() != boost::none && - *slot.local_endpoint() == local_address) - return true; - } - return false; - } - - //-------------------------------------------------------------------------- - -#if 0 - void onPeerAddressChanged ( - IP::Endpoint const& currentAddress, IP::Endpoint const& newAddress) - { - // VFALCO TODO Demote this to trace after PROXY is tested. - m_journal.debug << - "onPeerAddressChanged (" << currentAddress << - ", " << newAddress << ")"; - - SharedState::Access state (m_state); - - Connections::iterator iter ( - state->connections.find (currentAddress)); - - // Current address must exist! - consistency_check (iter != state->connections.end()); - - Connection& connection (iter->second); - - // Connection must be inbound! - consistency_check (connection.inbound()); - - // Connection must be connected! - consistency_check (connection.state() == Connection::stateConnected); - - // Create a new Connection entry for the new address - std::pair result ( - state->connections.emplace (newAddress, - Connection (iter->second))); - - // New address must not already exist! - consistency_check (result.second); - - // Remove old Connection entry - state->connections.erase (iter); - - // Update the address on the slot - Slot& slot (result.first->second.peersIterator()->second); - slot.address = newAddress; - } -#endif - //-------------------------------------------------------------------------- // // PropertyStream @@ -1334,7 +1090,7 @@ public: void writeSlots (PropertyStream::Set& set, Slots const& slots) { - for (auto entry : slots) + for (auto const& entry : slots) { PropertyStream::Map item (set); SlotImp const& slot (*entry.second); @@ -1418,40 +1174,6 @@ public: }; return "?"; } - - void dump_peers (Journal::ScopedStream& ss, - SharedState::ConstAccess const& state) const - { - ss << std::endl << std::endl << - "Slots"; - for (auto const entry : state->slots) - { - SlotImp const& slot (*entry.second); - ss << std::endl << - slot.remote_endpoint () << - (slot.inbound () ? " (in) " : " ") << - stateString (slot.state ()) << " " - // VFALCO NOTE currently this is broken - /* - << ((slot.public_key() != boost::none) ? - *slot.public_key() : "") - */ - ; - } - } - - void dump (Journal::ScopedStream& ss) const - { - SharedState::ConstAccess state (m_state); - - state->bootcache.dump (ss); - state->livecache.dump (ss); - dump_peers (ss, state); - ss << std::endl << - state->counts.state_string (); - ss << std::endl; - } - }; } @@ -1461,33 +1183,16 @@ public: /* -Terms +- recent tables entries should last equal to the cache time to live +- never send a slot a message thats in its recent table at a lower hop count +- when sending a message to a slot, add it to its recent table at one lower hop count -'Book' an order book -'Offer' an entry in a book -'Inverse Book' the book for the opposite direction +Giveaway logic -'Directory' Holds offers with the same quality level +When do we give away? -An order book is a list of offers. The book has the following -canonical order. The primary key is the quality (ratio of input to -output). The secondary key is an ordinal to break ties for two offers -with the same quality (first come first serve). +- To one inbound connection when we redirect due to full -Three places where books are iterated in canonical order: - -1. When responding to a client request for a book - -2. When placing an offer in the inverse book - -3. When processing a payment that goes through the book - -A directory is a type of structure in the ledger - - - -Invariants: - -- All that is needed to process a transaction is the current Ledger object. +- To all slots at every broadcast */ diff --git a/src/ripple/peerfinder/impl/Manager.cpp b/src/ripple/peerfinder/impl/Manager.cpp index dae6c4c24f..93e97f4f2e 100644 --- a/src/ripple/peerfinder/impl/Manager.cpp +++ b/src/ripple/peerfinder/impl/Manager.cpp @@ -36,9 +36,7 @@ public: SerializedContext m_context; CheckerAdapter m_checker; Logic m_logic; - DeadlineTimer m_connectTimer; - DeadlineTimer m_messageTimer; - DeadlineTimer m_cacheTimer; + DeadlineTimer m_secondsTimer; //-------------------------------------------------------------------------- @@ -56,9 +54,7 @@ public: , m_store (journal) , m_checker (m_context, m_queue) , m_logic (clock, callback, m_store, m_checker, journal) - , m_connectTimer (this) - , m_messageTimer (this) - , m_cacheTimer (this) + , m_secondsTimer (this) { } @@ -223,9 +219,7 @@ public: m_journal.debug << "Stopping"; m_checker.cancel (); m_logic.stop (); - m_connectTimer.cancel(); - m_messageTimer.cancel(); - m_cacheTimer.cancel(); + m_secondsTimer.cancel(); m_queue.dispatch ( m_context.wrap ( bind (&Thread::signalThreadShouldExit, this))); @@ -248,35 +242,14 @@ public: void onDeadlineTimer (DeadlineTimer& timer) { - if (timer == m_connectTimer) + if (timer == m_secondsTimer) { m_queue.dispatch ( m_context.wrap ( - bind (&Logic::makeOutgoingConnections, &m_logic))); + bind (&Logic::periodicActivity, &m_logic))); - m_connectTimer.setExpiration (Tuning::secondsPerConnect); + m_secondsTimer.setExpiration (Tuning::secondsPerConnect); } - else if (timer == m_messageTimer) - { - m_queue.dispatch ( - m_context.wrap ( - bind (&Logic::broadcast, &m_logic))); - - m_messageTimer.setExpiration (Tuning::secondsPerMessage); - } - else if (timer == m_cacheTimer) - { - m_queue.dispatch ( - m_context.wrap ( - bind (&Logic::sweepCache, &m_logic))); - - m_cacheTimer.setExpiration (Tuning::liveCacheSecondsToLive); - } - - // VFALCO NOTE Bit of a hack here... - m_queue.dispatch ( - m_context.wrap ( - bind (&Logic::periodicActivity, &m_logic))); } void init () @@ -299,12 +272,7 @@ public: m_logic.load (); } - m_connectTimer.setExpiration (Tuning::secondsPerConnect); - m_messageTimer.setExpiration (Tuning::secondsPerMessage); - m_cacheTimer.setExpiration (Tuning::liveCacheSecondsToLive); - - m_queue.post (m_context.wrap ( - bind (&Logic::makeOutgoingConnections, &m_logic))); + m_secondsTimer.setExpiration (std::chrono::seconds (1)); } void run () diff --git a/src/ripple/peerfinder/impl/RedirectHandouts.cpp b/src/ripple/peerfinder/impl/RedirectHandouts.cpp new file mode 100644 index 0000000000..ed23529e72 --- /dev/null +++ b/src/ripple/peerfinder/impl/RedirectHandouts.cpp @@ -0,0 +1,70 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "RedirectHandouts.h" + +namespace ripple { +namespace PeerFinder { + +RedirectHandouts::RedirectHandouts (SlotImp::ptr const& slot) + : m_slot (slot) +{ + m_list.reserve (Tuning::redirectEndpointCount); +} + +bool +RedirectHandouts::try_insert (Endpoint const& ep) +{ + if (full ()) + return false; + + // VFALCO NOTE This check can be removed when we provide the + // addresses in a peer HTTP handshake instead of + // the tmENDPOINTS message. + // + if (ep.hops > Tuning::maxHops) + return false; + + // Don't send them our address + if (ep.hops == 0) + return false; + + // Don't send them their own address + if (m_slot->remote_endpoint().address() == + ep.address.address()) + return false; + + // Make sure the address isn't already in our list + if (std::any_of (m_list.begin(), m_list.end(), + [&ep](Endpoint const& other) + { + // Ignore port for security reasons + return other.address.address() == ep.address.address(); + })) + { + return false; + } + + m_list.emplace_back (ep.address, ep.hops); + + return true; +} + +} +} diff --git a/src/ripple/peerfinder/impl/Seen.h b/src/ripple/peerfinder/impl/RedirectHandouts.h similarity index 59% rename from src/ripple/peerfinder/impl/Seen.h rename to src/ripple/peerfinder/impl/RedirectHandouts.h index 17e92b2999..c6ae51ca3a 100644 --- a/src/ripple/peerfinder/impl/Seen.h +++ b/src/ripple/peerfinder/impl/RedirectHandouts.h @@ -17,13 +17,44 @@ */ //============================================================================== -#ifndef RIPPLE_PEERFINDER_SEEN_H_INCLUDED -#define RIPPLE_PEERFINDER_SEEN_H_INCLUDED +#ifndef RIPPLE_PEERFINDER_REDIRECTHANDOUTS_H_INCLUDED +#define RIPPLE_PEERFINDER_REDIRECTHANDOUTS_H_INCLUDED + +#include "SlotImp.h" +#include "Tuning.h" namespace ripple { namespace PeerFinder { -/** Tracks endpoints we've seen from a peer. */ +/** Receives handouts for redirecting a connection. + An incoming connection request is redirected when we are full on slots. +*/ +class RedirectHandouts +{ +public: + RedirectHandouts (SlotImp::ptr const& slot); + + bool full () const + { + return m_list.size() >= Tuning::redirectEndpointCount; + } + + SlotImp::ptr const& slot () const + { + return m_slot; + } + + std::vector const& list() const + { + return m_list; + } + + bool try_insert (Endpoint const& ep); + +private: + SlotImp::ptr m_slot; + std::vector m_list; +}; } } diff --git a/src/ripple/peerfinder/impl/SlotHandouts.cpp b/src/ripple/peerfinder/impl/SlotHandouts.cpp new file mode 100644 index 0000000000..c35e373ef9 --- /dev/null +++ b/src/ripple/peerfinder/impl/SlotHandouts.cpp @@ -0,0 +1,70 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "SlotHandouts.h" + +namespace ripple { +namespace PeerFinder { + +SlotHandouts::SlotHandouts (SlotImp::ptr const& slot) + : m_slot (slot) +{ + m_list.reserve (Tuning::numberOfEndpoints); +} + +bool +SlotHandouts::try_insert (Endpoint const& ep) +{ + if (full ()) + return false; + + if (ep.hops > Tuning::maxHops) + return false; + + if (m_slot->recent.filter (ep.address, ep.hops)) + return false; + + // Don't send them their own address + if (m_slot->remote_endpoint().address() == + ep.address.address()) + return false; + + // Make sure the address isn't already in our list + if (std::any_of (m_list.begin(), m_list.end(), + [&ep](Endpoint const& other) + { + // Ignore port for security reasons + return other.address.address() == ep.address.address(); + })) + return false; + + m_list.emplace_back (ep.address, ep.hops); + + // Insert into this slot's recent table. Although the endpoint + // didn't come from the slot, adding it to the slot's table + // prevents us from sending it again until it has expired from + // the other end's cache. + // + m_slot->recent.insert (ep.address, ep.hops); + + return true; +} + +} +} diff --git a/src/ripple/peerfinder/impl/Sorts.h b/src/ripple/peerfinder/impl/SlotHandouts.h similarity index 59% rename from src/ripple/peerfinder/impl/Sorts.h rename to src/ripple/peerfinder/impl/SlotHandouts.h index a868596284..0f57fad9f9 100644 --- a/src/ripple/peerfinder/impl/Sorts.h +++ b/src/ripple/peerfinder/impl/SlotHandouts.h @@ -17,34 +17,46 @@ */ //============================================================================== -#ifndef RIPPLE_PEERFINDER_SORTS_H_INCLUDED -#define RIPPLE_PEERFINDER_SORTS_H_INCLUDED +#ifndef RIPPLE_PEERFINDER_SLOTHANDOUTS_H_INCLUDED +#define RIPPLE_PEERFINDER_SLOTHANDOUTS_H_INCLUDED + +#include "SlotImp.h" +#include "Tuning.h" namespace ripple { namespace PeerFinder { -/** Total ordering for Endpoint. - - The ordering must have these properties: - - - Endpoints with addresses differing only by port should be - sorted adjacent, by descending hop count. - - - The port number must participate in the ordering -*/ -struct LessEndpoints +/** Functor to receive endpoints for a slot during handout. */ +class SlotHandouts { - bool operator() (Endpoint const& lhs, Endpoint const& rhs) const +public: + explicit SlotHandouts (SlotImp::ptr const& slot); + + bool full () const { - if (lhs.address.address() < rhs.address.address()) - return true; - if (lhs.address.address() > rhs.address.address()) - return false; - // Break ties by preferring higher hops - if (lhs.hops > rhs.hops) - return true; - return lhs.address.port () < rhs.address.port (); + return m_list.size() >= Tuning::numberOfEndpoints; } + + void insert (Endpoint const& ep) + { + m_list.push_back (ep); + } + + SlotImp::ptr const& slot () const + { + return m_slot; + } + + std::vector const& list() const + { + return m_list; + } + + bool try_insert (Endpoint const& ep); + +private: + SlotImp::ptr m_slot; + std::vector m_list; }; } diff --git a/src/ripple/peerfinder/impl/SlotImp.cpp b/src/ripple/peerfinder/impl/SlotImp.cpp new file mode 100644 index 0000000000..592cc07c44 --- /dev/null +++ b/src/ripple/peerfinder/impl/SlotImp.cpp @@ -0,0 +1,134 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "SlotImp.h" + +namespace ripple { +namespace PeerFinder { + +SlotImp::SlotImp (IP::Endpoint const& local_endpoint, + IP::Endpoint const& remote_endpoint, bool fixed, + clock_type& clock) + : recent (clock) + , m_inbound (true) + , m_fixed (fixed) + , m_cluster (false) + , m_state (accept) + , m_remote_endpoint (remote_endpoint) + , m_local_endpoint (local_endpoint) + , checked (false) + , canAccept (false) + , connectivityCheckInProgress (false) +{ +} + +SlotImp::SlotImp (IP::Endpoint const& remote_endpoint, + bool fixed, clock_type& clock) + : recent (clock) + , m_inbound (false) + , m_fixed (fixed) + , m_cluster (false) + , m_state (connect) + , m_remote_endpoint (remote_endpoint) + , checked (true) + , canAccept (true) + , connectivityCheckInProgress (false) +{ +} + +void +SlotImp::state (State state_) +{ + // Must go through activate() to set active state + assert (state_ != active); + + // The state must be different + assert (state_ != m_state); + + // You can't transition into the initial states + assert (state_ != accept && state_ != connect); + + // Can only become connected from outbound connect state + assert (state_ != connected || (! m_inbound && m_state == connect)); + + // Can't gracefully close on an outbound connection attempt + assert (state_ != closing || m_state != connect); + + m_state = state_; +} + +void +SlotImp::activate (clock_type::time_point const& now) +{ + // Can only become active from the accept or connected state + assert (m_state == accept || m_state == connected); + + m_state = active; + whenAcceptEndpoints = now; +} + +//------------------------------------------------------------------------------ + +Slot::~Slot () +{ +} + +//------------------------------------------------------------------------------ + +SlotImp::recent_t::recent_t (clock_type& clock) + : cache (clock) +{ +} + +void +SlotImp::recent_t::insert (IP::Endpoint const& ep, int hops) +{ + auto const result (cache.emplace (ep, hops)); + if (! result.second) + { + // NOTE Other logic depends on this <= inequality. + if (hops <= result.first->second) + { + result.first->second = hops; + cache.touch (result.first); + } + } +} + +bool +SlotImp::recent_t::filter (IP::Endpoint const& ep, int hops) +{ + auto const iter (cache.find (ep)); + if (iter == cache.end()) + return false; + // We avoid sending an endpoint if we heard it + // from them recently at the same or lower hop count. + // NOTE Other logic depends on this <= inequality. + return iter->second <= hops; +} + +void +SlotImp::recent_t::expire () +{ + beast::expire (cache, + Tuning::liveCacheSecondsToLive); +} + +} +} diff --git a/src/ripple/peerfinder/impl/SlotImp.h b/src/ripple/peerfinder/impl/SlotImp.h index 7fedeeaf27..5294097bbe 100644 --- a/src/ripple/peerfinder/impl/SlotImp.h +++ b/src/ripple/peerfinder/impl/SlotImp.h @@ -22,6 +22,9 @@ #include "../api/Slot.h" +#include "../../../beast/beast/container/aged_unordered_map.h" +#include "../../../beast/beast/container/aged_container_utility.h" + #include namespace ripple { @@ -29,40 +32,20 @@ namespace PeerFinder { class SlotImp : public Slot { +private: + typedef beast::aged_unordered_map recent_type; + public: typedef std::shared_ptr ptr; // inbound SlotImp (IP::Endpoint const& local_endpoint, - IP::Endpoint const& remote_endpoint, bool fixed) - : m_inbound (true) - , m_fixed (fixed) - , m_cluster (false) - , m_state (accept) - , m_remote_endpoint (remote_endpoint) - , m_local_endpoint (local_endpoint) - , checked (false) - , canAccept (false) - , connectivityCheckInProgress (false) - { - } + IP::Endpoint const& remote_endpoint, bool fixed, + clock_type& clock); // outbound - SlotImp (IP::Endpoint const& remote_endpoint, bool fixed) - : m_inbound (false) - , m_fixed (fixed) - , m_cluster (false) - , m_state (connect) - , m_remote_endpoint (remote_endpoint) - , checked (true) - , canAccept (true) - , connectivityCheckInProgress (false) - { - } - - ~SlotImp () - { - } + SlotImp (IP::Endpoint const& remote_endpoint, + bool fixed, clock_type& clock); bool inbound () const { @@ -99,38 +82,6 @@ public: return m_public_key; } - //-------------------------------------------------------------------------- - - void state (State state_) - { - // Must go through activate() to set active state - assert (state_ != active); - - // The state must be different - assert (state_ != m_state); - - // You can't transition into the initial states - assert (state_ != accept && state_ != connect); - - // Can only become connected from outbound connect state - assert (state_ != connected || (! m_inbound && m_state == connect)); - - // Can't gracefully close on an outbound connection attempt - assert (state_ != closing || m_state != connect); - - m_state = state_; - } - - void activate (clock_type::time_point const& now) - { - // Can only become active from the accept or connected state - assert (m_state == accept || m_state == connected); - - m_state = active; - whenSendEndpoints = now; - whenAcceptEndpoints = now; - } - void local_endpoint (IP::Endpoint const& endpoint) { m_local_endpoint = endpoint; @@ -151,6 +102,43 @@ public: m_cluster = cluster_; } + //-------------------------------------------------------------------------- + + void state (State state_); + + void activate (clock_type::time_point const& now); + + // "Memberspace" + // + // The set of all recent addresses that we have seen from this peer. + // We try to avoid sending a peer the same addresses they gave us. + // + class recent_t + { + public: + explicit recent_t (clock_type& clock); + + /** Called for each valid endpoint received for a slot. + We also insert messages that we send to the slot to prevent + sending a slot the same address too frequently. + */ + void insert (IP::Endpoint const& ep, int hops); + + /** Returns `true` if we should not send endpoint to the slot. */ + bool filter (IP::Endpoint const& ep, int hops); + + private: + void expire (); + + friend class SlotImp; + recent_type cache; + } recent; + + void expire() + { + recent.expire(); + } + private: bool const m_inbound; bool const m_fixed; @@ -175,27 +163,13 @@ public: // progress. Valid always. bool connectivityCheckInProgress; - // The time after which we will send the peer mtENDPOINTS - clock_type::time_point whenSendEndpoints; - // The time after which we will accept mtENDPOINTS from the peer // This is to prevent flooding or spamming. Receipt of mtENDPOINTS // sooner than the allotted time should impose a load charge. // clock_type::time_point whenAcceptEndpoints; - - // The set of all recent IP::Endpoint that we have seen from this peer. - // We try to avoid sending a peer the same addresses they gave us. - // - //std::set received; }; -//------------------------------------------------------------------------------ - -Slot::~Slot () -{ -} - } } diff --git a/src/ripple/peerfinder/impl/Store.h b/src/ripple/peerfinder/impl/Store.h index 7931adbe38..5881d989f8 100644 --- a/src/ripple/peerfinder/impl/Store.h +++ b/src/ripple/peerfinder/impl/Store.h @@ -29,18 +29,17 @@ class Store public: virtual ~Store () { } - struct SavedBootstrapAddress + // load the bootstrap cache + typedef std::function load_callback; + virtual std::size_t load (load_callback const& cb) = 0; + + // save the bootstrap cache + struct Entry { - IP::Endpoint address; - std::chrono::seconds cumulativeUptime; - int connectionValence; + IP::Endpoint endpoint; + int valence; }; - - virtual std::vector - loadBootstrapCache () = 0; - - virtual void updateBootstrapCache ( - std::vector const& list) = 0; + virtual void save (std::vector const& v) = 0; }; } diff --git a/src/ripple/peerfinder/impl/StoreSqdb.h b/src/ripple/peerfinder/impl/StoreSqdb.h index c15e945499..e7457a1da7 100644 --- a/src/ripple/peerfinder/impl/StoreSqdb.h +++ b/src/ripple/peerfinder/impl/StoreSqdb.h @@ -36,7 +36,7 @@ public: enum { // This determines the on-database format of the data - currentSchemaVersion = 3 + currentSchemaVersion = 4 }; explicit StoreSqdb (Journal journal = Journal()) @@ -54,136 +54,95 @@ public: m_journal.info << "Opening database at '" << file.getFullPathName() << "'"; - if (!error) + if (! error) error = init (); - if (!error) + if (! error) error = update (); return error; } - // Loads the entire stored bootstrap cache and returns it as an array. + // Loads the bootstrap cache, calling the callback for each entry // - std::vector loadBootstrapCache () + std::size_t load (load_callback const& cb) { - std::vector list; - + std::size_t n (0); Error error; + std::string s; + int valence; + sqdb::statement st = (m_session.prepare << + "SELECT " + " address, " + " valence " + "FROM PeerFinder_BootstrapCache " + , sqdb::into (s) + , sqdb::into (valence) + ); - // Get the count - std::size_t count; - if (! error) + if (st.execute_and_fetch (error)) { - m_session.once (error) << - "SELECT COUNT(*) FROM PeerFinder_BootstrapCache " - ,sqdb::into (count) - ; - } - - if (error) - { - report (error, __FILE__, __LINE__); - return list; - } - - list.reserve (count); - - { - std::string s; - std::chrono::seconds::rep uptimeSeconds; - int connectionValence; - - sqdb::statement st = (m_session.prepare << - "SELECT " - " address, " - " uptime, " - " valence " - "FROM PeerFinder_BootstrapCache " - , sqdb::into (s) - , sqdb::into (uptimeSeconds) - , sqdb::into (connectionValence) - ); - - if (st.execute_and_fetch (error)) + do { - do + IP::Endpoint const endpoint ( + IP::Endpoint::from_string (s)); + + if (! is_unspecified (endpoint)) { - SavedBootstrapAddress entry; - - entry.address = IP::Endpoint::from_string (s); - - if (! is_unspecified (entry.address)) - { - entry.cumulativeUptime = std::chrono::seconds (uptimeSeconds); - entry.connectionValence = connectionValence; - - list.push_back (entry); - } - else - { - m_journal.error << - "Bad address string '" << s << "' in Bootcache table"; - } + cb (endpoint, valence); + ++n; + } + else + { + m_journal.error << + "Bad address string '" << s << "' in Bootcache table"; } - while (st.fetch (error)); } + while (st.fetch (error)); } if (error) - { report (error, __FILE__, __LINE__); - } - return list; + return n; } // Overwrites the stored bootstrap cache with the specified array. // - void updateBootstrapCache ( - std::vector const& list) + void save (std::vector const& v) { Error error; - sqdb::transaction tr (m_session); - m_session.once (error) << "DELETE FROM PeerFinder_BootstrapCache"; - if (! error) { std::string s; - std::chrono::seconds::rep uptimeSeconds; - int connectionValence; + int valence; sqdb::statement st = (m_session.prepare << "INSERT INTO PeerFinder_BootstrapCache ( " " address, " - " uptime, " " valence " ") VALUES ( " - " ?, ?, ? " + " ?, ? " ");" , sqdb::use (s) - , sqdb::use (uptimeSeconds) - , sqdb::use (connectionValence) + , sqdb::use (valence) ); - for (std::vector ::const_iterator iter ( - list.begin()); !error && iter != list.end(); ++iter) + for (auto const& e : v) { - s = to_string (iter->address); - uptimeSeconds = iter->cumulativeUptime.count (); - connectionValence = iter->connectionValence; - + s = to_string (e.endpoint); + valence = e.valence; st.execute_and_fetch (error); + if (error) + break; } } if (! error) - { error = tr.commit(); - } if (error) { @@ -203,7 +162,7 @@ public: // get version int version (0); - if (!error) + if (! error) { m_session.once (error) << "SELECT " @@ -223,35 +182,155 @@ public: } } - if (!error && version != currentSchemaVersion) - { - m_journal.info << - "Updating database to version " << currentSchemaVersion; - } - if (!error) { - if (version < 3) + if (version < currentSchemaVersion) + m_journal.info << + "Updating database to version " << currentSchemaVersion; + else if (version > currentSchemaVersion) + error.fail (__FILE__, __LINE__, + "The PeerFinder database version is higher than expected"); + } + + if (! error && version < 4) + { + // + // Remove the "uptime" column from the bootstrap table + // + + if (! error) + m_session.once (error) << + "CREATE TABLE IF NOT EXISTS PeerFinder_BootstrapCache_Next ( " + " id INTEGER PRIMARY KEY AUTOINCREMENT, " + " address TEXT UNIQUE NOT NULL, " + " valence INTEGER" + ");" + ; + + if (! error) + m_session.once (error) << + "CREATE INDEX IF NOT EXISTS " + " PeerFinder_BootstrapCache_Next_Index ON " + " PeerFinder_BootstrapCache_Next " + " ( address ); " + ; + + std::size_t count; + if (! error) + m_session.once (error) << + "SELECT COUNT(*) FROM PeerFinder_BootstrapCache " + ,sqdb::into (count) + ; + + std::vector list; + + if (! error) { - if (!error) - m_session.once (error) << - "DROP TABLE IF EXISTS LegacyEndpoints"; + list.reserve (count); + std::string s; + int valence; + sqdb::statement st = (m_session.prepare << + "SELECT " + " address, " + " valence " + "FROM PeerFinder_BootstrapCache " + , sqdb::into (s) + , sqdb::into (valence) + ); - if (!error) - m_session.once (error) << - "DROP TABLE IF EXISTS PeerFinderLegacyEndpoints"; - - if (!error) - m_session.once (error) << - "DROP TABLE IF EXISTS PeerFinder_LegacyEndpoints"; - - if (!error) - m_session.once (error) << - "DROP TABLE IF EXISTS PeerFinder_LegacyEndpoints_Index"; + if (st.execute_and_fetch (error)) + { + do + { + Store::Entry entry; + entry.endpoint = IP::Endpoint::from_string (s); + if (! is_unspecified (entry.endpoint)) + { + entry.valence = valence; + list.push_back (entry); + } + else + { + m_journal.error << + "Bad address string '" << s << "' in Bootcache table"; + } + } + while (st.fetch (error)); + } } + + if (! error) + { + std::string s; + int valence; + sqdb::statement st = (m_session.prepare << + "INSERT INTO PeerFinder_BootstrapCache_Next ( " + " address, " + " valence " + ") VALUES ( " + " ?, ?" + ");" + , sqdb::use (s) + , sqdb::use (valence) + ); + + for (auto iter (list.cbegin()); + !error && iter != list.cend(); ++iter) + { + s = to_string (iter->endpoint); + valence = iter->valence; + st.execute_and_fetch (error); + } + + } + + if (! error) + m_session.once (error) << + "DROP TABLE IF EXISTS PeerFinder_BootstrapCache"; + + if (! error) + m_session.once (error) << + "DROP INDEX IF EXISTS PeerFinder_BootstrapCache_Index"; + + if (! error) + m_session.once (error) << + "ALTER TABLE PeerFinder_BootstrapCache_Next " + " RENAME TO PeerFinder_BootstrapCache"; + + if (! error) + m_session.once (error) << + "CREATE INDEX IF NOT EXISTS " + " PeerFinder_BootstrapCache_Index ON PeerFinder_BootstrapCache " + " ( " + " address " + " ); " + ; } - if (!error) + if (! error && version < 3) + { + // + // Remove legacy endpoints from the schema + // + + if (! error) + m_session.once (error) << + "DROP TABLE IF EXISTS LegacyEndpoints"; + + if (! error) + m_session.once (error) << + "DROP TABLE IF EXISTS PeerFinderLegacyEndpoints"; + + if (! error) + m_session.once (error) << + "DROP TABLE IF EXISTS PeerFinder_LegacyEndpoints"; + + if (! error) + m_session.once (error) << + "DROP TABLE IF EXISTS PeerFinder_LegacyEndpoints_Index"; + } + + if (! error) { int const version (currentSchemaVersion); m_session.once (error) << @@ -264,7 +343,7 @@ public: ,sqdb::use(version); } - if (!error) + if (! error) error = tr.commit(); if (error) @@ -283,35 +362,27 @@ private: sqdb::transaction tr (m_session); if (! error) - { m_session.once (error) << "PRAGMA encoding=\"UTF-8\""; - } if (! error) - { m_session.once (error) << "CREATE TABLE IF NOT EXISTS SchemaVersion ( " " name TEXT PRIMARY KEY, " " version INTEGER" ");" ; - } if (! error) - { m_session.once (error) << "CREATE TABLE IF NOT EXISTS PeerFinder_BootstrapCache ( " " id INTEGER PRIMARY KEY AUTOINCREMENT, " " address TEXT UNIQUE NOT NULL, " - " uptime INTEGER," " valence INTEGER" ");" ; - } if (! error) - { m_session.once (error) << "CREATE INDEX IF NOT EXISTS " " PeerFinder_BootstrapCache_Index ON PeerFinder_BootstrapCache " @@ -319,12 +390,9 @@ private: " address " " ); " ; - } if (! error) - { error = tr.commit(); - } if (error) { diff --git a/src/ripple/peerfinder/impl/Tuning.h b/src/ripple/peerfinder/impl/Tuning.h index 4846460a9b..3b73bcecc3 100644 --- a/src/ripple/peerfinder/impl/Tuning.h +++ b/src/ripple/peerfinder/impl/Tuning.h @@ -58,18 +58,6 @@ enum /** The default value of Config::maxPeers. */ ,defaultMaxPeers = 21 - - //--------------------------------------------------------- - // - // LegacyEndpoints - // - //--------------------------------------------------------- - - // How many legacy endpoints to keep in our cache - ,legacyEndpointCacheSize = 1000 - - // How many cache mutations between each database update - ,legacyEndpointMutationsPerUpdate = 50 }; //------------------------------------------------------------------------------ @@ -111,21 +99,17 @@ static std::chrono::seconds const bootcacheCooldownTime (60); enum { // Drop incoming messages with hops greater than this number - maxHops = 10 + maxHops = 6 // How many Endpoint to send in each mtENDPOINTS - ,numberOfEndpoints = 10 + ,numberOfEndpoints = 2 * maxHops // The most Endpoint we will accept in mtENDPOINTS ,numberOfEndpointsMax = 20 - // The maximum number of hops that we allow. Peers farther - // away than this are dropped. - ,maxPeerHopCount = 10 - // The number of peers that we want by default, unless an // explicit value is set in the config file. - ,defaultMaxPeerCount = 20 + ,defaultMaxPeerCount = 21 /** Number of addresses we provide when redirecting. */ ,redirectEndpointCount = 10 @@ -136,7 +120,16 @@ static std::chrono::seconds const secondsPerMessage (5); // How long an Endpoint will stay in the cache // This should be a small multiple of the broadcast frequency -static std::chrono::seconds const liveCacheSecondsToLive (60); +static std::chrono::seconds const liveCacheSecondsToLive (30); + +// +// +// + +// How much time to wait before trying an outgoing address again. +// Note that we ignore the port for purposes of comparison. +static std::chrono::seconds const recentAttemptDuration (60); + } /** @} */ diff --git a/src/ripple/peerfinder/impl/handout.h b/src/ripple/peerfinder/impl/handout.h new file mode 100644 index 0000000000..89f14c6912 --- /dev/null +++ b/src/ripple/peerfinder/impl/handout.h @@ -0,0 +1,88 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_PEERFINDER_HANDOUT_H_INCLUDED +#define RIPPLE_PEERFINDER_HANDOUT_H_INCLUDED + +namespace ripple { +namespace PeerFinder { + +namespace detail { + +/** Tries to insert one object in the target. + When an item is handed out it is moved to the end of the container. + @return The number of objects handed out +*/ +// VFALCO TODO specialization that handles std::list for SequenceContainer +// using splice for optimization over erase/push_back +// +template +std::size_t handout_one (Target& t, HopContainer& h) +{ + assert (! t.full()); + for (auto hi (h.begin()); hi != h.end(); ++hi) + { + auto const& e (*hi); + if (t.try_insert (e)) + { + h.move_back (hi); + return 1; + } + } + return 0; +} + +} + +/** Distributes objects to targets according to business rules. + A best effort is made to evenly distribute items in the sequence + container list into the target sequence list. +*/ +template +void handout (TargetFwdIter first, TargetFwdIter last, + SeqFwdIter seq_first, SeqFwdIter seq_last) +{ + for (;;) + { + std::size_t n (0); + for (auto si (seq_first); si != seq_last; ++si) + { + auto c (*si); + bool all_full (true); + for (auto ti (first); ti != last; ++ti) + { + auto& t (*ti); + if (! t.full()) + { + n += detail::handout_one (t, c); + all_full = false; + } + } + if (all_full) + return; + } + if (! n) + break; + } +} + +} +} + +#endif diff --git a/src/ripple/peerfinder/ripple_peerfinder.cpp b/src/ripple/peerfinder/ripple_peerfinder.cpp index 2e18294dbd..84f3a8b123 100644 --- a/src/ripple/peerfinder/ripple_peerfinder.cpp +++ b/src/ripple/peerfinder/ripple_peerfinder.cpp @@ -56,38 +56,37 @@ using namespace beast; #endif #include "impl/PrivateTypes.h" - -# include "impl/Tuning.h" -# include "impl/Checker.h" +#include "impl/Tuning.h" +#include "impl/Checker.h" #include "impl/CheckerAdapter.h" -# include "impl/Sorts.h" -# include "impl/Giveaways.h" -# include "impl/Livecache.h" -# include "impl/SlotImp.h" -# include "impl/Counts.h" -# include "impl/Source.h" +#include "impl/Livecache.h" +#include "impl/SlotImp.h" +#include "impl/Counts.h" +#include "impl/Source.h" #include "impl/SourceStrings.h" -# include "impl/Store.h" -# include "impl/Bootcache.h" -//# include "impl/Peer.h" +#include "impl/Store.h" +#include "impl/Bootcache.h" #include "impl/StoreSqdb.h" -# include "impl/Reporting.h" +#include "impl/Reporting.h" #include "impl/Logic.h" +#include "impl/Bootcache.cpp" #include "impl/Checker.cpp" #include "impl/Config.cpp" +#include "impl/ConnectHandouts.cpp" #include "impl/Endpoint.cpp" #include "impl/Livecache.cpp" #include "impl/Manager.cpp" +#include "impl/RedirectHandouts.cpp" +#include "impl/SlotHandouts.cpp" +#include "impl/SlotImp.cpp" #include "impl/SourceStrings.cpp" -//#include "sim/sync_timer.h" - #include "sim/GraphAlgorithms.h" #include "sim/WrappedSink.h" -# include "sim/Predicates.h" -# include "sim/FunctionQueue.h" -# include "sim/Message.h" -# include "sim/NodeSnapshot.h" -# include "sim/Params.h" +#include "sim/Predicates.h" +#include "sim/FunctionQueue.h" +#include "sim/Message.h" +#include "sim/NodeSnapshot.h" +#include "sim/Params.h" #include "sim/Tests.cpp" diff --git a/src/ripple/peerfinder/sim/Tests.cpp b/src/ripple/peerfinder/sim/Tests.cpp index 690c5d2d67..bc4cf59944 100644 --- a/src/ripple/peerfinder/sim/Tests.cpp +++ b/src/ripple/peerfinder/sim/Tests.cpp @@ -189,7 +189,7 @@ public: , m_next_port (m_config.listening_endpoint.port() + 1) , m_logic (boost::in_place ( boost::ref (clock), boost::ref (*this), boost::ref (*this), boost::ref (*this), m_journal)) - , m_whenSweep (m_network.now() + Tuning::liveCacheSecondsToLive) + , m_when_expire (m_network.now() + std::chrono::seconds (1)) { logic().setConfig (m_config.config); logic().load (); @@ -285,10 +285,10 @@ public: logic().makeOutgoingConnections (); logic().sendEndpoints (); - if (m_network.now() >= m_whenSweep) + if (m_network.now() >= m_when_expire) { - logic().sweepCache(); - m_whenSweep = m_network.now() + Tuning::liveCacheSecondsToLive; + logic().expire(); + m_when_expire = m_network.now() + std::chrono::seconds (1); } m_livecache_history.emplace_back ( @@ -533,7 +533,7 @@ private: Journal m_journal; IP::Port m_next_port; boost::optional m_logic; - clock_type::time_point m_whenSweep; + clock_type::time_point m_when_expire; SavedBootstrapAddresses m_bootstrap_cache; }; diff --git a/src/ripple_overlay/api/Peer.h b/src/ripple_overlay/api/Peer.h index 1cc49705d4..ac5dedb3c1 100644 --- a/src/ripple_overlay/api/Peer.h +++ b/src/ripple_overlay/api/Peer.h @@ -64,8 +64,6 @@ public: virtual Json::Value json () = 0; - virtual bool isConnected () const = 0; - virtual bool isInCluster () const = 0; virtual std::string getClusterNodeName() const = 0; diff --git a/src/ripple_overlay/api/Peers.h b/src/ripple_overlay/api/Peers.h index 115904f63f..65d806e398 100644 --- a/src/ripple_overlay/api/Peers.h +++ b/src/ripple_overlay/api/Peers.h @@ -240,8 +240,6 @@ struct match_peer bool operator() (Peer::ref peer) const { - bassert(peer->isConnected()); - if(matchPeer && (peer.get () == matchPeer)) return true; @@ -285,8 +283,6 @@ struct peer_in_set bool operator() (Peer::ref peer) const { - bassert(peer->isConnected()); - if (peerSet.count (peer->getShortId ()) == 0) return false; diff --git a/src/ripple_overlay/impl/PeerImp.h b/src/ripple_overlay/impl/PeerImp.h index e199fdaae9..968f74216c 100644 --- a/src/ripple_overlay/impl/PeerImp.h +++ b/src/ripple_overlay/impl/PeerImp.h @@ -560,11 +560,6 @@ public: return ret; } - bool isConnected () const - { - // CHECKME should this be stateActive or something else? - return (m_state == stateActive) && !m_detaching; - } bool isInCluster () const { return m_clusterNode; diff --git a/src/ripple_overlay/impl/Peers.cpp b/src/ripple_overlay/impl/Peers.cpp index 91f0c1a198..9fc9342adf 100644 --- a/src/ripple_overlay/impl/Peers.cpp +++ b/src/ripple_overlay/impl/Peers.cpp @@ -316,7 +316,6 @@ public: void send (PeerFinder::Slot::ptr const& slot, std::vector const& endpoints) { - bassert (! endpoints.empty()); typedef std::vector List; protocol::TMEndpoints tm; for (List::const_iterator iter (endpoints.begin()); @@ -346,10 +345,7 @@ public: assert (iter != m_peers.end ()); PeerImp::ptr const peer (iter->second.lock()); assert (peer != nullptr); - // VFALCO TODO Why are we checking isConnected? - // That should not be needed - if (peer->isConnected()) - peer->sendPacket (msg, false); + peer->sendPacket (msg, false); } }