diff --git a/src/ripple/peerfinder/api/Config.h b/src/ripple/peerfinder/api/Config.h index ef63574fe..36bba9243 100644 --- a/src/ripple/peerfinder/api/Config.h +++ b/src/ripple/peerfinder/api/Config.h @@ -35,6 +35,9 @@ struct Config /** True if we want to accept incoming connections. */ bool wantIncoming; + /** True if we want to establish connections automatically */ + bool connectAutomatically; + uint16 listeningPort; std::string featureList; }; diff --git a/src/ripple/peerfinder/impl/Cache.h b/src/ripple/peerfinder/impl/Cache.h index 7357469cd..68fd4700e 100644 --- a/src/ripple/peerfinder/impl/Cache.h +++ b/src/ripple/peerfinder/impl/Cache.h @@ -17,8 +17,8 @@ */ //============================================================================== -#ifndef RIPPLE_PEERFINDER_ENDPOINTCACHE_H_INCLUDED -#define RIPPLE_PEERFINDER_ENDPOINTCACHE_H_INCLUDED +#ifndef RIPPLE_PEERFINDER_CACHE_H_INCLUDED +#define RIPPLE_PEERFINDER_CACHE_H_INCLUDED namespace ripple { namespace PeerFinder { @@ -33,23 +33,11 @@ private: Journal m_journal; - Table m_now; - Table m_prev; + Table m_endpoints; - // Refresh the existing entry with a new message - void refresh (CachedEndpoint& entry, Endpoint const& message) - { - entry.message.hops = std::min (entry.message.hops, message.hops); - - // Copy the other fields based on uptime - if (entry.message.uptimeMinutes < message.uptimeMinutes) - { - entry.message.incomingSlotsAvailable = message.incomingSlotsAvailable; - entry.message.incomingSlotsMax = message.incomingSlotsMax; - entry.message.uptimeMinutes = message.uptimeMinutes; - entry.message.featureList = message.featureList; - } - } + // Tracks all the cached endpoints stored in the endpoint table + // in oldest-to-newest order. The oldest item is at the head. + List m_list; public: explicit Cache (Journal journal) @@ -63,30 +51,67 @@ public: std::size_t size() const { - return m_now.size() + m_prev.size(); + return m_endpoints.size(); } // Cycle the tables - void cycle() + void cycle(DiscreteTime now) { - std::swap (m_now, m_prev); - m_now.clear(); + List ::iterator iter (m_list.begin()); + + while (iter != m_list.end()) + { + if (iter->whenExpires > now) + break; + + CachedEndpoint &ep (*iter); + + // We need to remove the entry from the list before + // we remove it from the table. + iter = m_list.erase(iter); + + m_journal.debug << "Cache entry for " << + ep.message.address << " expired."; + + m_endpoints.erase (ep.message.address); + } } // Insert or update an existing entry with the new message - void insert (Endpoint const& message) + void insert (Endpoint const& message, DiscreteTime now) { - Table::iterator iter (m_prev.find (message.address)); - if (iter != m_prev.end()) - { - } - else - { - std::pair result ( - m_now.emplace (message.address, message)); - if (!result.second) - refresh (result.first->second, message); + std::pair result ( + m_endpoints.emplace (message.address, CachedEndpoint(message, now))); + + if (!result.second) + { // There was already an entry for this endpoint. Update it. + CachedEndpoint& entry (result.first->second); + + entry.message.hops = std::min (entry.message.hops, message.hops); + + // Copy the other fields based on uptime + if (entry.message.uptimeMinutes < message.uptimeMinutes) + { + entry.message.incomingSlotsAvailable = message.incomingSlotsAvailable; + entry.message.incomingSlotsMax = message.incomingSlotsMax; + entry.message.uptimeMinutes = message.uptimeMinutes; + entry.message.featureList = message.featureList; + } + + entry.whenExpires = now + cacheSecondsToLive; + + // It must already be in the list. Remove it in preparation. + m_list.erase (m_list.iterator_to(entry)); } + + CachedEndpoint& entry (result.first->second); + + m_journal.debug << "Cache entry for " << message.address << + " is valid until " << entry.whenExpires << + " (" << entry.message.incomingSlotsAvailable << + "/" << entry.message.incomingSlotsMax << ")"; + + m_list.push_back (entry); } }; diff --git a/src/ripple/peerfinder/impl/CachedEndpoint.h b/src/ripple/peerfinder/impl/CachedEndpoint.h index 2ea0fae6a..ffa708456 100644 --- a/src/ripple/peerfinder/impl/CachedEndpoint.h +++ b/src/ripple/peerfinder/impl/CachedEndpoint.h @@ -23,14 +23,16 @@ namespace ripple { namespace PeerFinder { -struct CachedEndpoint +struct CachedEndpoint : public List::Node { - CachedEndpoint (Endpoint const& message_) + CachedEndpoint (Endpoint const& message_, DiscreteTime now) : message (message_) + , whenExpires(now + cacheSecondsToLive) { } Endpoint message; + DiscreteTime whenExpires; }; } diff --git a/src/ripple/peerfinder/impl/CheckerAdapter.h b/src/ripple/peerfinder/impl/CheckerAdapter.h index d7282a33b..f4258b2a6 100644 --- a/src/ripple/peerfinder/impl/CheckerAdapter.h +++ b/src/ripple/peerfinder/impl/CheckerAdapter.h @@ -23,6 +23,12 @@ namespace ripple { namespace PeerFinder { +// Ensures that all Logic member function entry points are +// called while holding a lock on the recursive mutex. +// +typedef ScopedWrapperContext < + RecursiveMutex, RecursiveMutex::ScopedLockType> SerializedContext; + /** Adapts a ServiceQueue to dispatch Checker handler completions. This lets the Logic have its Checker handler get dispatched on the ServiceQueue instead of an io_service thread. Otherwise, @@ -31,30 +37,40 @@ namespace PeerFinder { class CheckerAdapter : public Checker { private: + SerializedContext& m_context; ServiceQueue& m_queue; ScopedPointer m_checker; struct Handler { - ServiceQueue* m_queue; + SerializedContext& m_context; + ServiceQueue& m_queue; AbstractHandler m_handler; Handler ( + SerializedContext& context, ServiceQueue& queue, AbstractHandler handler) - : m_queue (&queue) + : m_context (context) + , m_queue (queue) , m_handler (handler) { } void operator() (Checker::Result result) { - m_queue->wrap (m_handler) (result); + // VFALCO TODO Fix this, it is surely wrong but + // this supposedly correct line doesn't compile + //m_queue.wrap (m_context.wrap (m_handler)) (result); + + // WRONG + m_queue.wrap (m_handler) (result); } }; public: - explicit CheckerAdapter (ServiceQueue& queue) - : m_queue (queue) + CheckerAdapter (SerializedContext& context, ServiceQueue& queue) + : m_context (context) + , m_queue (queue) , m_checker (Checker::New()) { } @@ -73,7 +89,8 @@ public: void async_test (IPEndpoint const& endpoint, AbstractHandler handler) { - m_checker->async_test (endpoint, Handler (m_queue, handler)); + m_checker->async_test (endpoint, Handler ( + m_context, m_queue, handler)); } }; diff --git a/src/ripple/peerfinder/impl/Config.cpp b/src/ripple/peerfinder/impl/Config.cpp index 02c48be30..1fe635d53 100644 --- a/src/ripple/peerfinder/impl/Config.cpp +++ b/src/ripple/peerfinder/impl/Config.cpp @@ -23,6 +23,7 @@ namespace PeerFinder { Config::Config () : maxPeerCount (20) , wantIncoming (false) + , connectAutomatically (false) , listeningPort (0) { } diff --git a/src/ripple/peerfinder/impl/LegacyEndpoint.h b/src/ripple/peerfinder/impl/LegacyEndpoint.h index 4480f36d3..36668c4ab 100644 --- a/src/ripple/peerfinder/impl/LegacyEndpoint.h +++ b/src/ripple/peerfinder/impl/LegacyEndpoint.h @@ -26,22 +26,25 @@ namespace PeerFinder { struct LegacyEndpoint { LegacyEndpoint () - : checked (false) + : whenInserted (0) + , lastGet(0) + ,checked (false) , canAccept (false) { } - LegacyEndpoint (IPEndpoint const& address_) + LegacyEndpoint (IPEndpoint const& address_, DiscreteTime now) : address (address_) - , whenInserted (RelativeTime::fromStartup()) + , whenInserted (now) + , lastGet(0) { } IPEndpoint address; // When we inserted the endpoint into the cache - RelativeTime mutable whenInserted; + DiscreteTime mutable whenInserted; // When we last used the endpoint for outging connection attempts - RelativeTime mutable lastGet; + DiscreteTime mutable lastGet; // True if we ever tried to connect bool mutable checked; diff --git a/src/ripple/peerfinder/impl/LegacyEndpointCache.h b/src/ripple/peerfinder/impl/LegacyEndpointCache.h index 5b9139b76..a9311f5b6 100644 --- a/src/ripple/peerfinder/impl/LegacyEndpointCache.h +++ b/src/ripple/peerfinder/impl/LegacyEndpointCache.h @@ -164,7 +164,7 @@ public: } /** Load the legacy endpoints cache from the database. */ - void load () + void load (DiscreteTime now) { typedef std::vector List; List list; @@ -173,7 +173,7 @@ public: for (List::const_iterator iter (list.begin()); iter != list.end(); ++iter) { - std::pair result (insert (*iter)); + std::pair result (insert (*iter, now)); if (result.second) ++n; } @@ -186,10 +186,10 @@ public: The return value provides a reference to the new or existing endpoint. The bool indicates whether or not the insertion took place. */ - std::pair insert (IPEndpoint const& address) + std::pair insert (IPEndpoint const& address, DiscreteTime now) { std::pair result ( - m_map.insert (LegacyEndpoint (address))); + m_map.insert (LegacyEndpoint (address, now))); if (m_map.size() > legacyEndpointCacheSize) prune(); if (result.second) @@ -224,13 +224,12 @@ public: Also updates the lastGet field of the LegacyEndpoint so we will avoid re-using the address until we have tried all the others. */ - void get (std::size_t n, std::vector & result) const + void get (std::size_t n, std::vector & result, DiscreteTime now) const { FlattenedList list (flatten()); std::random_shuffle (list.begin(), list.end()); std::sort (list.begin(), list.end(), GetLess()); n = std::min (n, list.size()); - RelativeTime const now (RelativeTime::fromStartup()); for (FlattenedList::iterator iter (list.begin()); n-- && iter!=list.end(); ++iter) { diff --git a/src/ripple/peerfinder/impl/Logic.h b/src/ripple/peerfinder/impl/Logic.h index 7d2d7d85f..bf2ab5b57 100644 --- a/src/ripple/peerfinder/impl/Logic.h +++ b/src/ripple/peerfinder/impl/Logic.h @@ -140,7 +140,7 @@ public: // void load () { - m_legacyCache.load(); + m_legacyCache.load (get_now()); } // Returns a suitable Endpoint representing us. @@ -172,10 +172,14 @@ public: return true; } - // Make outgoing connections to bring us up to desired out count + // If configured to make outgoing connections, do us in order + // to bring us up to desired out count. // void makeOutgoingConnections () { + if (m_config.connectAutomatically) + return; + std::vector list; if (m_slots.outDesired > m_slots.outboundCount) @@ -183,7 +187,7 @@ public: int const needed (std::min ( m_slots.outDesired - m_slots.outboundCount, int (maxAddressesPerAttempt))); - m_legacyCache.get (needed, list); + m_legacyCache.get (needed, list, get_now()); } if (m_fixedPeersConnected < m_fixedPeers.size()) @@ -200,10 +204,16 @@ public: } } -#if RIPPLE_USE_PEERFINDER if (! list.empty()) m_callback.connectPeerEndpoints (list); -#endif + } + + // Returns the number of seconds that have elapsed since some baseline + // event. + // + virtual DiscreteTime get_now() + { + return 0; } //-------------------------------------------------------------------------- @@ -245,16 +255,21 @@ public: m_sources.push_back (source); } - // Called periodically to cycle and age the varioous caches. + // Called periodically to cycle and age the various caches. // void cycleCache() { - m_cache.cycle(); + m_cache.cycle(get_now()); + for (Peers::iterator iter (m_peers.begin()); iter != m_peers.end(); ++iter) iter->received.cycle(); } + void onPeerConnecting () + { + } + // Called when a peer connection is established. // We are guaranteed that the PeerID is not already in our map. // but we are *NOT* guaranteed that the IP isn't. So we need @@ -269,7 +284,7 @@ public: std::pair result ( m_peers.insert ( - PeerInfo (id, address, inbound))); + PeerInfo (id, address, inbound, get_now()))); bassert (result.second); m_slots.addPeer (m_config, inbound); } @@ -349,9 +364,9 @@ public: { if (! m_peers.empty()) { - m_journal.trace << "Sending mtENDPOINTS"; + m_journal.trace << "Sending endpoints to our peers"; - RelativeTime const now (RelativeTime::fromStartup()); + DiscreteTime const now (get_now()); for (Peers::iterator iter (m_peers.begin()); iter != m_peers.end(); ++iter) @@ -361,7 +376,7 @@ public: { sendEndpoints (peer); peer.whenSendEndpoints = now + - RelativeTime (secondsPerMessage); + secondsPerMessage; } } } @@ -380,6 +395,9 @@ public: { PeerInfo const& peer (*iter); + // Mark that a check for this peer is finished. + peer.connectivityCheckInProgress = false; + if (! result.error) { peer.checked = true; @@ -418,13 +436,13 @@ public: Peers::iterator iter (m_peers.find (id)); bassert (iter != m_peers.end()); - RelativeTime const now (RelativeTime::fromStartup()); + DiscreteTime const now (get_now()); PeerInfo const& peer (*iter); pruneEndpoints (peer.address.to_string(), list); // Log at higher severity if this is the first time - m_journal.stream (peer.whenAcceptEndpoints.isZero() ? + m_journal.stream (peer.whenAcceptEndpoints == 0 ? Journal::kInfo : Journal::kTrace) << "Received " << list.size() << " endpoints from " << peer.address; @@ -439,6 +457,9 @@ public: m_callback.chargePeerLoadPenalty(id); } + m_journal.debug << "Peer " << peer.address << + " sent us " << list.size() << " endpoints."; + // Process each entry // int neighborCount (0); @@ -450,16 +471,27 @@ public: // Remember that this peer gave us this address peer.received.insert (message.address); + m_journal.debug << "Received peer " << message.address << + " at " << message.hops << " hops."; + if (message.hops == 0) { ++neighborCount; if (neighborCount == 1) { - if (! peer.checked) + if (peer.connectivityCheckInProgress) { + m_journal.warning << "Connectivity check for " << + message.address << "already in progress."; + } + else if (! peer.checked) + { + // Mark that a check for this peer is now in progress. + peer.connectivityCheckInProgress = true; + // Test the peer's listening port before // adding it to the cache for the first time. - // + // m_checker.async_test (message.address, bind ( &Logic::onCheckEndpoint, this, id, message.address, _1)); @@ -475,20 +507,20 @@ public: // listening test, else we silently drop their message // since their listening port is misconfigured. // - m_cache.insert (message); + m_cache.insert (message, get_now()); } } } else { - m_cache.insert (message); + m_cache.insert (message, get_now()); } } if (neighborCount > 1) { m_journal.warning << "Peer " << peer.address << - " sent " << neighborCount << " entries with hops=0"; + " sent " << neighborCount << " entries with hops=0"; // VFALCO TODO Should we apply load charges? } @@ -528,11 +560,13 @@ public: if (! results.error) { std::size_t newEntries (0); + DiscreteTime now (get_now()); + for (std::vector ::const_iterator iter (results.list.begin()); iter != results.list.end(); ++iter) { std::pair result ( - m_legacyCache.insert (*iter)); + m_legacyCache.insert (*iter, now)); if (result.second) ++newEntries; } @@ -558,8 +592,6 @@ public: if (result.error == boost::asio::error::operation_aborted) return; - RelativeTime const now (RelativeTime::fromStartup()); - if (! result.error) { if (result.canAccept) @@ -581,7 +613,7 @@ public: if (! validIPEndpoint (address)) return; std::pair result ( - m_legacyCache.insert (address)); + m_legacyCache.insert (address, get_now())); if (result.second) { // its new diff --git a/src/ripple/peerfinder/impl/LogicType.h b/src/ripple/peerfinder/impl/LogicType.h new file mode 100644 index 000000000..de90b23a1 --- /dev/null +++ b/src/ripple/peerfinder/impl/LogicType.h @@ -0,0 +1,53 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_PEERFINDER_LOGICTYPE_H_INCLUDED +#define RIPPLE_PEERFINDER_LOGICTYPE_H_INCLUDED + +namespace ripple { +namespace PeerFinder { + +/** Provides the Clock required by Logic's get_now(). + This allows the unit tests to provide its own manual clock. +*/ +template +class LogicType : public Logic +{ +public: + explicit LogicType (Callback& callback, + Store& store, + Checker& checker, + Journal journal) + : Logic (callback, store, checker, journal) + { + } + + DiscreteTime get_now () + { + return m_clock(); + } + +private: + Clock m_clock; +}; + +} +} + +#endif diff --git a/src/ripple/peerfinder/impl/Manager.cpp b/src/ripple/peerfinder/impl/Manager.cpp index e82fa50e7..b013ec000 100644 --- a/src/ripple/peerfinder/impl/Manager.cpp +++ b/src/ripple/peerfinder/impl/Manager.cpp @@ -185,20 +185,13 @@ public: ServiceQueue m_queue; Journal m_journal; StoreSqdb m_store; + SerializedContext m_context; CheckerAdapter m_checker; - Logic m_logic; + LogicType m_logic; DeadlineTimer m_connectTimer; DeadlineTimer m_messageTimer; DeadlineTimer m_cacheTimer; - // Ensures that all Logic member function entry points are - // called while holding a lock on the recursive mutex. - // - typedef ScopedWrapperContext < - RecursiveMutex, RecursiveMutex::ScopedLockType> SerializedContext; - - SerializedContext m_context; - //-------------------------------------------------------------------------- ManagerImp (Stoppable& stoppable, Callback& callback, Journal journal) @@ -206,7 +199,7 @@ public: , Thread ("PeerFinder") , m_journal (journal) , m_store (journal) - , m_checker (m_queue) + , m_checker (m_context, m_queue) , m_logic (callback, m_store, m_checker, journal) , m_connectTimer (this) , m_messageTimer (this) @@ -267,13 +260,20 @@ public: // VFALCO TODO This needs to be implemented } + void onPeerConnecting () + { + m_queue.dispatch ( + m_context.wrap ( + bind (&Logic::onPeerConnecting, &m_logic))); + } + void onPeerConnected (PeerID const& id, IPEndpoint const& address, bool incoming) { m_queue.dispatch ( m_context.wrap ( bind (&Logic::onPeerConnected, &m_logic, - id, address, incoming)));; + id, address, incoming))); } void onPeerDisconnected (const PeerID& id) @@ -383,8 +383,6 @@ public: File const file (File::getSpecialLocation ( File::userDocumentsDirectory).getChildFile ("PeerFinder.sqlite")); - m_journal.debug << "Opening database at '" << file.getFullPathName() << "'"; - Error error (m_store.open (file)); if (error) @@ -402,7 +400,9 @@ public: m_messageTimer.setExpiration (secondsPerMessage); m_cacheTimer.setExpiration (cacheSecondsToLive); - m_queue.post (bind (&Logic::makeOutgoingConnections, &m_logic)); + m_queue.post ( + m_context.wrap ( + bind (&Logic::makeOutgoingConnections, &m_logic))); } void run () diff --git a/src/ripple/peerfinder/impl/PeerInfo.h b/src/ripple/peerfinder/impl/PeerInfo.h index d3a48fcb5..70086d340 100644 --- a/src/ripple/peerfinder/impl/PeerInfo.h +++ b/src/ripple/peerfinder/impl/PeerInfo.h @@ -32,14 +32,16 @@ struct PeerInfo { PeerInfo (PeerID const& id_, IPEndpoint const& address_, - bool inbound_) + bool inbound_, + DiscreteTime now) : id (id_) , address (address_) , inbound (inbound_) , checked (inbound_ ? false : true) , canAccept (inbound_ ? false : true) - , whenSendEndpoints (RelativeTime::fromStartup()) - , whenAcceptEndpoints (RelativeTime::fromStartup()) + , connectivityCheckInProgress (false) + , whenSendEndpoints (now) + , whenAcceptEndpoints (now) { } @@ -52,17 +54,21 @@ struct PeerInfo bool mutable checked; // Set to indicate if the connection can receive incoming at the - // address advertised in mtENDPOINTS. Only valid if checked is true + // address advertised in mtENDPOINTS. Only valid if checked is true. bool mutable canAccept; + // Set to indicate that a connection check for this peer is in + // progress. Valid always. + bool mutable connectivityCheckInProgress; + // The time after which we will send the peer mtENDPOINTS - RelativeTime mutable whenSendEndpoints; + DiscreteTime mutable whenSendEndpoints; // The time after which we will accept mtENDPOINTS from the peer // This is to prevent flooding or spamming. Receipt of mtENDPOINTS // sooner than the allotted time should impose a load charge. // - RelativeTime mutable whenAcceptEndpoints; + DiscreteTime mutable whenAcceptEndpoints; // The set of all recent IPEndpoint that we have seen from this peer. // We try to avoid sending a peer the same addresses they gave us. diff --git a/src/ripple/peerfinder/impl/PrivateTypes.h b/src/ripple/peerfinder/impl/PrivateTypes.h new file mode 100644 index 000000000..820a7d031 --- /dev/null +++ b/src/ripple/peerfinder/impl/PrivateTypes.h @@ -0,0 +1,32 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_PEERFINDER_PRIVATE_TYPES_H_INCLUDED +#define RIPPLE_PEERFINDER_PRIVATE_TYPES_H_INCLUDED + +namespace ripple { +namespace PeerFinder { + +/** Time in seconds since some baseline event in the past. */ +typedef int DiscreteTime; + +} +} + +#endif diff --git a/src/ripple/peerfinder/impl/SimpleMonotonicClock.h b/src/ripple/peerfinder/impl/SimpleMonotonicClock.h new file mode 100644 index 000000000..c178ac1b5 --- /dev/null +++ b/src/ripple/peerfinder/impl/SimpleMonotonicClock.h @@ -0,0 +1,40 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_PEERFINDER_SIMPLEMONOTONICCLOCK_H_INCLUDED +#define RIPPLE_PEERFINDER_SIMPLEMONOTONICCLOCK_H_INCLUDED + +namespace ripple { +namespace PeerFinder { + +/** Monotonically increasing time value. */ +struct SimpleMonotonicClock +{ + typedef int value_type; + + value_type operator() () const + { + return value_type (RelativeTime::fromStartup().inSeconds()); + } +}; + +} +} + +#endif diff --git a/src/ripple/peerfinder/impl/Slots.cpp b/src/ripple/peerfinder/impl/Slots.cpp index 54b7148f4..8f111bc76 100644 --- a/src/ripple/peerfinder/impl/Slots.cpp +++ b/src/ripple/peerfinder/impl/Slots.cpp @@ -34,30 +34,25 @@ Slots::Slots () void Slots::update (Config const& config) { - if (! config.wantIncoming) - { - inboundSlots = 0; - inboundSlotsMaximum = 0; - } + double outDesiredFraction = 1; + + if (config.wantIncoming) + outDesiredFraction = config.maxPeerCount * (Config::outPercent * .01); + + if (m_roundUpwards) + outDesired = int (std::ceil (outDesiredFraction)); else - { - double outDesiredFraction ( - config.maxPeerCount * (Config::outPercent * .01)); + outDesired = int (std::floor (outDesiredFraction)); - if (m_roundUpwards) - outDesired = int (std::ceil (outDesiredFraction)); - else - outDesired = int (std::floor (outDesiredFraction)); - if (outDesired < Config::minOutCount) - outDesired = Config::minOutCount; + if (outDesired < Config::minOutCount) + outDesired = Config::minOutCount; - if (config.maxPeerCount >= outDesired) - inboundSlotsMaximum = config.maxPeerCount - outDesired; - else - inboundSlotsMaximum = 0; + if (config.maxPeerCount >= outDesired) + inboundSlotsMaximum = config.maxPeerCount - outDesired; + else + inboundSlotsMaximum = 0; - inboundSlots = std::max (inboundSlotsMaximum - inboundCount, 0); - } + inboundSlots = std::max (inboundSlotsMaximum - inboundCount, 0); } void Slots::addPeer (Config const& config, bool inbound) diff --git a/src/ripple/peerfinder/impl/StoreSqdb.h b/src/ripple/peerfinder/impl/StoreSqdb.h index 2af1fc696..afaae3717 100644 --- a/src/ripple/peerfinder/impl/StoreSqdb.h +++ b/src/ripple/peerfinder/impl/StoreSqdb.h @@ -24,13 +24,21 @@ namespace ripple { namespace PeerFinder { /** Database persistence for PeerFinder using SQLite */ -class StoreSqdb : public Store +class StoreSqdb + : public Store + , public LeakChecked { private: Journal m_journal; sqdb::session m_session; public: + enum + { + // This determines the on-database format of the data + currentSchemaVersion = 2 + }; + explicit StoreSqdb (Journal journal = Journal()) : m_journal (journal) { @@ -44,9 +52,14 @@ public: { Error error (m_session.open (file.getFullPathName ())); + m_journal.info << "Opening database at '" << file.getFullPathName() << "'"; + if (!error) error = init (); + if (!error) + error = update (); + return error; } @@ -62,7 +75,7 @@ public: if (! error) { m_session.once (error) << - "SELECT COUNT(*) FROM LegacyEndpoints " + "SELECT COUNT(*) FROM PeerFinder_LegacyEndpoints " ,sqdb::into (count) ; } @@ -78,7 +91,7 @@ public: { std::string s; sqdb::statement st = (m_session.prepare << - "SELECT ipv4 FROM LegacyEndpoints " + "SELECT ipv4 FROM PeerFinder_LegacyEndpoints " ,sqdb::into (s) ); @@ -110,13 +123,13 @@ public: sqdb::transaction tr (m_session); m_session.once (error) << - "DELETE FROM LegacyEndpoints"; + "DELETE FROM PeerFinder_LegacyEndpoints"; if (! error) { std::string s; sqdb::statement st = (m_session.prepare << - "INSERT INTO LegacyEndpoints ( " + "INSERT INTO PeerFinder_LegacyEndpoints ( " " ipv4 " ") VALUES ( " " ? " @@ -145,6 +158,79 @@ public: } } + // Convert any existing entries from an older schema to the + // current one, if approrpriate. + // + Error update () + { + Error error; + + sqdb::transaction tr (m_session); + + // get version + int version (0); + if (!error) + { + m_session.once (error) << + "SELECT " + " version " + "FROM SchemaVersion WHERE " + " name = 'PeerFinder'" + ,sqdb::into (version) + ; + + if (! error) + { + if (!m_session.got_data()) + version = 0; + + m_journal.info << "Opened version " << version << " database"; + } + } + + if (!error && version != currentSchemaVersion) + { + m_journal.info << + "Updateding database to version " << currentSchemaVersion; + } + + if (!error && (version < 2)) + { + if (!error) + m_session.once (error) << + "DROP TABLE IF EXISTS LegacyEndpoints"; + + if (!error) + m_session.once (error) << + "DROP TABLE IF EXISTS PeerFinderLegacyEndpoints"; + } + + if (!error) + { + int const version (currentSchemaVersion); + m_session.once (error) << + "INSERT OR REPLACE INTO SchemaVersion (" + " name " + " ,version " + ") VALUES ( " + " 'PeerFinder', ? " + ")" + ,sqdb::use(version); + } + + if (!error) + error = tr.commit(); + + if (error) + { + tr.rollback(); + report (error, __FILE__, __LINE__); + } + + return error; + } + + private: Error init () { @@ -160,13 +246,34 @@ private: if (! error) { m_session.once (error) << - "CREATE TABLE IF NOT EXISTS LegacyEndpoints ( " + "CREATE TABLE IF NOT EXISTS SchemaVersion ( " + " name TEXT PRIMARY KEY, " + " version INTEGER" + ");" + ; + } + + if (! error) + { + m_session.once (error) << + "CREATE TABLE IF NOT EXISTS PeerFinder_LegacyEndpoints ( " " id INTEGER PRIMARY KEY AUTOINCREMENT, " " ipv4 TEXT UNIQUE NOT NULL " ");" ; } + if (! error) + { + m_session.once (error) << + "CREATE INDEX IF NOT EXISTS " + " PeerFinder_LegacyEndpoints_Index ON PeerFinder_LegacyEndpoints " + " ( " + " ipv4 " + " ); " + ; + } + if (! error) { error = tr.commit(); diff --git a/src/ripple/peerfinder/ripple_peerfinder.cpp b/src/ripple/peerfinder/ripple_peerfinder.cpp index 52852d599..51b18d5e1 100644 --- a/src/ripple/peerfinder/ripple_peerfinder.cpp +++ b/src/ripple/peerfinder/ripple_peerfinder.cpp @@ -42,6 +42,8 @@ namespace ripple { using namespace beast; } +#include "impl/SimpleMonotonicClock.h" +#include "impl/PrivateTypes.h" # include "impl/Tuning.h" # include "impl/Checker.h" #include "impl/CheckerAdapter.h" @@ -56,6 +58,7 @@ using namespace beast; # include "impl/PeerInfo.h" #include "impl/StoreSqdb.h" #include "impl/Logic.h" +#include "impl/LogicType.h" #include "impl/Checker.cpp" #include "impl/Config.cpp" diff --git a/src/ripple_app/peers/Peers.cpp b/src/ripple_app/peers/Peers.cpp index 0ebc8a48c..183919857 100644 --- a/src/ripple_app/peers/Peers.cpp +++ b/src/ripple_app/peers/Peers.cpp @@ -127,6 +127,12 @@ public: if (config.wantIncoming) config.listeningPort = getConfig().peerListeningPort; + // if it's a private peer or we are running as standalone + // automatic connections would defeat the purpose. + config.connectAutomatically = + !getConfig().RUN_STANDALONE && + !getConfig().PEER_PRIVATE; + config.featureList = ""; m_peerFinder->setConfig (config);