From bb29c8ba8549c4b4b8a59d5765fa3b8c9d0d162a Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Fri, 4 Oct 2013 19:11:10 -0700 Subject: [PATCH] Add PeerFinder::Checker for testing endpoints --- Builds/VisualStudio2012/RippleD.vcxproj | 12 + .../VisualStudio2012/RippleD.vcxproj.filters | 21 ++ src/ripple/peerfinder/api/Callback.h | 3 - src/ripple/peerfinder/api/Endpoint.h | 2 - src/ripple/peerfinder/impl/Checker.cpp | 183 ++++++++++++++ src/ripple/peerfinder/impl/Checker.h | 89 +++++++ src/ripple/peerfinder/impl/CheckerAdapter.h | 83 +++++++ src/ripple/peerfinder/impl/Config.cpp | 2 - src/ripple/peerfinder/impl/EndpointCache.h | 4 - src/ripple/peerfinder/impl/LegacyEndpoint.h | 52 ++++ .../peerfinder/impl/LegacyEndpointCache.h | 125 ++++++++++ src/ripple/peerfinder/impl/Logic.h | 232 +++++++++++------- src/ripple/peerfinder/impl/Manager.cpp | 11 +- src/ripple/peerfinder/impl/PeerInfo.h | 16 +- src/ripple/peerfinder/impl/Slots.h | 2 - src/ripple/peerfinder/impl/SourceStrings.cpp | 2 - src/ripple/peerfinder/impl/SourceStrings.h | 2 - src/ripple/peerfinder/impl/StoreSqdb.h | 4 - src/ripple/peerfinder/impl/Tests.cpp | 2 - src/ripple/peerfinder/impl/Tuning.h | 51 ++++ src/ripple/peerfinder/ripple_peerfinder.cpp | 36 ++- src/ripple/peerfinder/ripple_peerfinder.h | 6 +- src/ripple/types/api/CycledSet.h | 110 +++++++++ src/ripple/types/ripple_types.h | 2 + src/ripple/validators/impl/Tuning.h | 88 ------- 25 files changed, 919 insertions(+), 221 deletions(-) create mode 100644 src/ripple/peerfinder/impl/Checker.cpp create mode 100644 src/ripple/peerfinder/impl/Checker.h create mode 100644 src/ripple/peerfinder/impl/CheckerAdapter.h create mode 100644 src/ripple/peerfinder/impl/LegacyEndpoint.h create mode 100644 src/ripple/peerfinder/impl/LegacyEndpointCache.h create mode 100644 src/ripple/peerfinder/impl/Tuning.h create mode 100644 src/ripple/types/api/CycledSet.h diff --git a/Builds/VisualStudio2012/RippleD.vcxproj b/Builds/VisualStudio2012/RippleD.vcxproj index 23888ca8b3..ae525e03d0 100644 --- a/Builds/VisualStudio2012/RippleD.vcxproj +++ b/Builds/VisualStudio2012/RippleD.vcxproj @@ -72,6 +72,12 @@ true + + true + true + true + true + true true @@ -1635,7 +1641,11 @@ + + + + @@ -1643,6 +1653,7 @@ + @@ -1672,6 +1683,7 @@ + diff --git a/Builds/VisualStudio2012/RippleD.vcxproj.filters b/Builds/VisualStudio2012/RippleD.vcxproj.filters index 2524b6d48e..de94657228 100644 --- a/Builds/VisualStudio2012/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2012/RippleD.vcxproj.filters @@ -1089,6 +1089,9 @@ [1] Ripple\json\impl + + [1] Ripple\peerfinder\impl + @@ -2196,6 +2199,24 @@ [1] Ripple\validators\impl + + [1] Ripple\peerfinder\impl + + + [1] Ripple\peerfinder\impl + + + [1] Ripple\peerfinder\impl + + + [1] Ripple\types\api + + + [1] Ripple\peerfinder\impl + + + [1] Ripple\peerfinder\impl + diff --git a/src/ripple/peerfinder/api/Callback.h b/src/ripple/peerfinder/api/Callback.h index 121b9acf59..8cfdb7d94d 100644 --- a/src/ripple/peerfinder/api/Callback.h +++ b/src/ripple/peerfinder/api/Callback.h @@ -20,9 +20,6 @@ #ifndef RIPPLE_PEERFINDER_CALLBACK_H_INCLUDED #define RIPPLE_PEERFINDER_CALLBACK_H_INCLUDED -#include "Endpoint.h" -#include "Types.h" - namespace ripple { namespace PeerFinder { diff --git a/src/ripple/peerfinder/api/Endpoint.h b/src/ripple/peerfinder/api/Endpoint.h index b1adc3a7f7..074ce9f0c3 100644 --- a/src/ripple/peerfinder/api/Endpoint.h +++ b/src/ripple/peerfinder/api/Endpoint.h @@ -20,8 +20,6 @@ #ifndef RIPPLE_PEERFINDER_ENDPOINT_H_INCLUDED #define RIPPLE_PEERFINDER_ENDPOINT_H_INCLUDED -#include "Types.h" - namespace ripple { namespace PeerFinder { diff --git a/src/ripple/peerfinder/impl/Checker.cpp b/src/ripple/peerfinder/impl/Checker.cpp new file mode 100644 index 0000000000..f791cf1bdb --- /dev/null +++ b/src/ripple/peerfinder/impl/Checker.cpp @@ -0,0 +1,183 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +namespace ripple { +namespace PeerFinder { + +class CheckerImp + : public Checker + , private Thread + , private LeakChecked +{ +private: + class Request; + + struct State + { + List list; + }; + + typedef SharedData SharedState; + + SharedState m_state; + boost::asio::io_service m_io_service; + boost::optional m_work; + + //-------------------------------------------------------------------------- + + static boost::asio::ip::tcp::endpoint fromIPEndpoint ( + IPEndpoint const& ipEndpoint) + { + if (ipEndpoint.isV4 ()) + { + return boost::asio::ip::tcp::endpoint ( + boost::asio::ip::address_v4 ( + ipEndpoint.v4().value), + ipEndpoint.port ()); + } + bassertfalse; + return boost::asio::ip::tcp::endpoint (); + } + + //-------------------------------------------------------------------------- + + class Request + : public SharedObject + , public List ::Node + , private LeakChecked + { + public: + typedef SharedPtr Ptr; + typedef boost::asio::ip::tcp Protocol; + typedef boost::system::error_code error_code; + typedef Protocol::socket socket_type; + typedef Protocol::endpoint endpoint_type; + + CheckerImp& m_owner; + boost::asio::io_service& m_io_service; + IPEndpoint m_address; + AbstractHandler m_handler; + socket_type m_socket; + boost::system::error_code m_error; + bool m_canAccept; + + Request (CheckerImp& owner, boost::asio::io_service& io_service, + IPEndpoint const& address, AbstractHandler handler) + : m_owner (owner) + , m_io_service (io_service) + , m_address (address) + , m_handler (handler) + , m_socket (m_io_service) + , m_canAccept (false) + { + m_owner.add (*this); + + m_socket.async_connect (fromIPEndpoint (m_address), + wrapHandler (boost::bind (&Request::handle_connect, Ptr(this), + boost::asio::placeholders::error), m_handler)); + } + + ~Request () + { + Result result; + result.address = m_address; + result.error = m_error; + m_io_service.wrap (m_handler) (result); + + m_owner.remove (*this); + } + + void cancel () + { + m_socket.cancel(); + } + + void handle_connect (boost::system::error_code ec) + { + m_error = ec; + if (ec) + return; + + m_canAccept = true; + } + }; + + //-------------------------------------------------------------------------- + + void add (Request& request) + { + SharedState::Access state (m_state); + state->list.push_back (request); + } + + void remove (Request& request) + { + SharedState::Access state (m_state); + state->list.erase (state->list.iterator_to (request)); + } + + void run () + { + m_io_service.run (); + } + +public: + CheckerImp () + : Thread ("PeerFinder::Checker") + , m_work (boost::in_place (boost::ref (m_io_service))) + { + startThread (); + } + + ~CheckerImp () + { + // cancel pending i/o + cancel(); + + // destroy the io_service::work object + m_work = boost::none; + + // signal and wait for the thread to exit gracefully + stopThread (); + } + + void cancel () + { + SharedState::Access state (m_state); + for (List ::iterator iter (state->list.begin()); + iter != state->list.end(); ++iter) + iter->cancel(); + } + + void async_test (IPEndpoint const& endpoint, + AbstractHandler handler) + { + new Request (*this, m_io_service, endpoint, handler); + } +}; + +//------------------------------------------------------------------------------ + +Checker* Checker::New () +{ + return new CheckerImp; +} + +} +} diff --git a/src/ripple/peerfinder/impl/Checker.h b/src/ripple/peerfinder/impl/Checker.h new file mode 100644 index 0000000000..cf85ea825c --- /dev/null +++ b/src/ripple/peerfinder/impl/Checker.h @@ -0,0 +1,89 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_PEERFINDER_CHECKER_H_INCLUDED +#define RIPPLE_PEERFINDER_CHECKER_H_INCLUDED + +namespace ripple { +namespace PeerFinder { + +/** Tests remote listening sockets to make sure they are connectible. */ +class Checker +{ +public: + /** Create the service. + This will automatically start the associated thread and io_service. + */ + static Checker* New (); + + /** Destroy the service. + Any pending I/O operations will be canceled. This call blocks until + all pending operations complete (either with success or with + operation_aborted) and the associated thread and io_service have + no more work remaining. + */ + virtual ~Checker () { } + + /** Cancel pending I/O. + This issues cancel orders for all pending I/O operations and then + returns immediately. Handlers will receive operation_aborted errors, + or if they were already queued they will complete normally. + */ + virtual void cancel () = 0; + + struct Result + { + Result () + : canAccept (false) + { } + + /** The original address. */ + IPEndpoint address; + + /** The error code from the operation. */ + boost::system::error_code error; + + /** `true` if the endpoint is reachable, else `false`. + Only defined if no error occurred. + */ + bool canAccept; + }; + + /** Performs an async connection test on the specified endpoint. + The port must be non-zero. + Handler will be called with this signature: + void (Result const& result); + */ + template + void async_test (IPEndpoint const& endpoint, + BEAST_MOVE_ARG(Handler) handler) + { + async_test (endpoint, + AbstractHandler ( + BEAST_MOVE_CAST(Handler)(handler))); + } + + virtual void async_test (IPEndpoint const& endpoint, + AbstractHandler handler) = 0; +}; + +} +} + +#endif diff --git a/src/ripple/peerfinder/impl/CheckerAdapter.h b/src/ripple/peerfinder/impl/CheckerAdapter.h new file mode 100644 index 0000000000..d7282a33ba --- /dev/null +++ b/src/ripple/peerfinder/impl/CheckerAdapter.h @@ -0,0 +1,83 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_PEERFINDER_CHECKERADAPTER_H_INCLUDED +#define RIPPLE_PEERFINDER_CHECKERADAPTER_H_INCLUDED + +namespace ripple { +namespace PeerFinder { + +/** Adapts a ServiceQueue to dispatch Checker handler completions. + This lets the Logic have its Checker handler get dispatched + on the ServiceQueue instead of an io_service thread. Otherwise, + Logic would need a ServiceQueue to dispatch from its handler. +*/ +class CheckerAdapter : public Checker +{ +private: + ServiceQueue& m_queue; + ScopedPointer m_checker; + + struct Handler + { + ServiceQueue* m_queue; + AbstractHandler m_handler; + + Handler ( + ServiceQueue& queue, + AbstractHandler handler) + : m_queue (&queue) + , m_handler (handler) + { } + + void operator() (Checker::Result result) + { + m_queue->wrap (m_handler) (result); + } + }; + +public: + explicit CheckerAdapter (ServiceQueue& queue) + : m_queue (queue) + , m_checker (Checker::New()) + { + } + + ~CheckerAdapter () + { + // Have to do this before other fields get destroyed + m_checker = nullptr; + } + + void cancel () + { + m_checker->cancel(); + } + + void async_test (IPEndpoint const& endpoint, + AbstractHandler handler) + { + m_checker->async_test (endpoint, Handler (m_queue, handler)); + } +}; + +} +} + +#endif diff --git a/src/ripple/peerfinder/impl/Config.cpp b/src/ripple/peerfinder/impl/Config.cpp index d3e5b6e732..02c48be30b 100644 --- a/src/ripple/peerfinder/impl/Config.cpp +++ b/src/ripple/peerfinder/impl/Config.cpp @@ -17,8 +17,6 @@ */ //============================================================================== -#include "../api/Config.h" - namespace ripple { namespace PeerFinder { diff --git a/src/ripple/peerfinder/impl/EndpointCache.h b/src/ripple/peerfinder/impl/EndpointCache.h index 6463cafe00..f3ae74b5c1 100644 --- a/src/ripple/peerfinder/impl/EndpointCache.h +++ b/src/ripple/peerfinder/impl/EndpointCache.h @@ -20,10 +20,6 @@ #ifndef RIPPLE_PEERFINDER_ENDPOINTCACHE_H_INCLUDED #define RIPPLE_PEERFINDER_ENDPOINTCACHE_H_INCLUDED -#include "../../ripple/types/api/AgedHistory.h" - -#include "../api/Types.h" - namespace ripple { namespace PeerFinder { diff --git a/src/ripple/peerfinder/impl/LegacyEndpoint.h b/src/ripple/peerfinder/impl/LegacyEndpoint.h new file mode 100644 index 0000000000..2d1632597a --- /dev/null +++ b/src/ripple/peerfinder/impl/LegacyEndpoint.h @@ -0,0 +1,52 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_PEERFINDER_LEGACYENDPOINT_H_INCLUDED +#define RIPPLE_PEERFINDER_LEGACYENDPOINT_H_INCLUDED + +namespace ripple { +namespace PeerFinder { + +struct LegacyEndpoint +{ + LegacyEndpoint () + : checked (false) + , canAccept (false) + { } + + LegacyEndpoint (IPEndpoint const& address_) + : address (address_) + { } + + IPEndpoint address; + + // When we last gave the endpoint out for connection attempts + RelativeTime mutable lastGet; + + // True if we ever tried to connect + bool mutable checked; + + // The result of the last connect attempt + bool mutable canAccept; +}; + +} +} + +#endif diff --git a/src/ripple/peerfinder/impl/LegacyEndpointCache.h b/src/ripple/peerfinder/impl/LegacyEndpointCache.h new file mode 100644 index 0000000000..7971bc5b54 --- /dev/null +++ b/src/ripple/peerfinder/impl/LegacyEndpointCache.h @@ -0,0 +1,125 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_PEERFINDER_LEGACYENDPOINTCACHE_H_INCLUDED +#define RIPPLE_PEERFINDER_LEGACYENDPOINTCACHE_H_INCLUDED + +namespace ripple { +namespace PeerFinder { + +/** A container for managing the cache of legacy endpoints. */ +class LegacyEndpointCache +{ +private: + typedef boost::multi_index_container < + LegacyEndpoint, boost::multi_index::indexed_by < + boost::multi_index::hashed_unique < + BOOST_MULTI_INDEX_MEMBER(PeerFinder::LegacyEndpoint,IPEndpoint,address), + IPEndpoint::hasher> + > + > MapType; + + MapType m_map; + +public: + typedef std::vector FlattenedList; + + LegacyEndpointCache () + { + } + + ~LegacyEndpointCache () + { + } + + /** Attempt to insert the endpoint. + The caller is responsible for making sure the address is valid. + The return value provides a reference to the new or existing endpoint. + The bool indicates whether or not the insertion took place. + */ + std::pair insert (IPEndpoint const& address) + { + std::pair result ( + m_map.insert (LegacyEndpoint (address))); + return std::make_pair (*result.first, result.second); + } + + /** Returns a pointer to the legacy endpoint or nullptr. */ + LegacyEndpoint const* find (IPEndpoint const& address) + { + MapType::iterator iter (m_map.find (address)); + if (iter != m_map.end()) + return &*iter; + return nullptr; + } + + /** Updates the metadata following a connection attempt. + @param canAccept A flag indicating if the connection succeeded. + */ + void checked (IPEndpoint const& address, bool canAccept) + { + LegacyEndpoint const* endpoint (find (address)); + if (endpoint != nullptr) + { + endpoint->checked = true; + endpoint->canAccept = canAccept; + } + } + + struct Compare + { + bool operator() (LegacyEndpoint const* lhs, + LegacyEndpoint const* rhs) const + { + return lhs->lastGet < rhs->lastGet; + } + }; + + /** Appends up to n addresses for establishing outbound peers. */ + void get (std::size_t n, std::vector & result) const + { + FlattenedList list (flatten()); + std::random_shuffle (list.begin(), list.end()); + std::sort (list.begin(), list.end(), Compare()); + n = std::min (n, list.size()); + RelativeTime const now (RelativeTime::fromStartup()); + for (FlattenedList::iterator iter (list.begin()); + n-- && iter!=list.end(); ++iter) + { + result.push_back ((*iter)->address); + (*iter)->lastGet = now; + } + } + + /** Returns a flattened array of pointers to the legacy endpoints. */ + FlattenedList flatten () const + { + FlattenedList list; + list.reserve (m_map.size()); + for (MapType::iterator iter (m_map.begin()); + iter != m_map.end(); ++iter) + list.push_back (&*iter); + return list; + } +}; + +} +} + +#endif diff --git a/src/ripple/peerfinder/impl/Logic.h b/src/ripple/peerfinder/impl/Logic.h index a0bed834d3..d42167bdf3 100644 --- a/src/ripple/peerfinder/impl/Logic.h +++ b/src/ripple/peerfinder/impl/Logic.h @@ -20,45 +20,9 @@ #ifndef RIPPLE_PEERFINDER_LOGIC_H_INCLUDED #define RIPPLE_PEERFINDER_LOGIC_H_INCLUDED -#include "../../ripple/types/api/AgedHistory.h" - -#include "PeerInfo.h" -#include "Slots.h" -#include "Store.h" - -#include -#include "beast/modules/beast_core/system/BeforeBoost.h" -#include -#include -#include -#include -#include - namespace ripple { namespace PeerFinder { -// Tunable constants -enum -{ - // How often we will try to make outgoing connections - secondsPerConnect = 10, - - // How often we send or accept mtENDPOINTS messages per peer - secondsPerEndpoints = 5, - - // How many Endpoint to send in each mtENDPOINTS - numberOfEndpoints = 10, - - // The most Endpoint we will accept in mtENDPOINTS - numberOfEndpointsMax = 20, - - // How many legacy endpoints to keep in our cache - numberOfLegacyEndpoints = 1000, - - // How often legacy endpoints are updated in the database - legacyEndpointUpdateSeconds = 60 * 60 -}; - //-------------------------------------------------------------------------- /* @@ -89,8 +53,6 @@ inline bool operator== (EndpointInfo const& lhs, EndpointInfo const& rhs) return lhs.endpoint == rhs.endpoint; } -typedef AgedHistory > LegacyEndpoints; - //-------------------------------------------------------------------------- typedef boost::multi_index_container < @@ -138,6 +100,7 @@ public: Callback& m_callback; Store& m_store; + Checker& m_checker; Journal m_journal; Config m_config; @@ -150,16 +113,21 @@ public: // Our view of the current set of connected peers. Peers m_peers; - LegacyEndpoints m_legacyEndpoints; - bool m_legacyEndpointsDirty; + LegacyEndpointCache m_legacyCache; + bool m_legacyCacheDirty; //---------------------------------------------------------------------- - Logic (Callback& callback, Store& store, Journal journal) + Logic ( + Callback& callback, + Store& store, + Checker& checker, + Journal journal) : m_callback (callback) , m_store (store) + , m_checker (checker) , m_journal (journal) - , m_legacyEndpointsDirty (false) + , m_legacyCacheDirty (false) { } @@ -174,8 +142,7 @@ public: m_store.loadLegacyEndpoints (list); for (List::const_iterator iter (list.begin()); iter != list.end(); ++iter) - m_legacyEndpoints->insert (*iter); - m_legacyEndpoints.swap(); + m_legacyCache.insert (*iter); m_journal.debug << "Loaded " << list.size() << " legacy endpoints"; } @@ -253,17 +220,6 @@ public: // void createLegacyEndpointList (std::vector & list) { - list.clear (); - list.reserve (m_legacyEndpoints.front().size() + - m_legacyEndpoints.back().size()); - - for (LegacyEndpoints::container_type::const_iterator iter ( - m_legacyEndpoints.front().begin()); iter != m_legacyEndpoints.front().end(); ++iter) - list.push_back (*iter); - - for (LegacyEndpoints::container_type::const_iterator iter ( - m_legacyEndpoints.back().begin()); iter != m_legacyEndpoints.back().end(); ++iter) - list.push_back (*iter); } // Make outgoing connections to bring us up to desired out count @@ -272,13 +228,9 @@ public: { if (m_slots.outDesired > m_slots.outboundCount) { + int const needed (m_slots.outDesired - m_slots.outboundCount); std::vector list; - createLegacyEndpointList (list); - std::random_shuffle (list.begin(), list.end()); - - int needed = m_slots.outDesired - m_slots.outboundCount; - if (needed > list.size()) - needed = list.size(); + m_legacyCache.get (needed, list); #if RIPPLE_USE_PEERFINDER m_callback.connectPeerEndpoints (list); @@ -290,6 +242,7 @@ public: // void fetch (Source& source) { +#if 0 m_journal.debug << "Fetching " << source.name(); Source::IPEndpoints endpoints; @@ -299,16 +252,17 @@ public: { for (Source::IPEndpoints::const_iterator iter (endpoints.begin()); iter != endpoints.end(); ++iter) - m_legacyEndpoints->insert (*iter); + m_legacyCache->insert (*iter); - if (m_legacyEndpoints->size() > (numberOfLegacyEndpoints/2)) + if (m_legacyCache->size() > (numberOfLegacyEndpoints/2)) { - m_legacyEndpoints.swap(); - m_legacyEndpoints->clear(); + m_legacyCache.swap(); + m_legacyCache->clear(); } - m_legacyEndpointsDirty = true; + m_legacyCacheDirty = true; } +#endif } //---------------------------------------------------------------------- @@ -398,6 +352,75 @@ public: } } + // Called when the Checker completes a connectivity test + // + void onCheckEndpoint (PeerID const& id, Checker::Result const& result) + { + if (result.error == boost::asio::error::operation_aborted) + return; + + Peers::iterator iter (m_peers.find (id)); + if (iter != m_peers.end()) + { + PeerInfo const& peer (*iter); + + if (! result.error) + { + peer.checked = true; + peer.canAccept = result.canAccept; + + if (peer.canAccept) + m_journal.info << "Peer " << peer.address << + " passed listening test"; + else + m_journal.warning << "Peer " << peer.address << + " cannot accept incoming connections"; + } + else + { + // VFALCO TODO Should we retry depending on the error? + peer.checked = true; + peer.canAccept = false; + + m_journal.error << "Listening test for " << + peer.address << " failed: " << + result.error.message(); + } + } + else + { + // The peer disconnected before we finished the check + m_journal.debug << "Finished listening test for " << + id << " but the peer disconnected. "; + } + } + + // Called when the Checker completes a connectivity test for a legacy address + // + void onCheckLegacyEndpoint (IPEndpoint const& endpoint, + Checker::Result const& result) + { + if (result.error == boost::asio::error::operation_aborted) + return; + + RelativeTime const now (RelativeTime::fromStartup()); + + if (! result.error) + { + if (result.canAccept) + m_journal.info << "Legacy address " << endpoint << + " passed listening test"; + else + m_journal.warning << "Legacy address " << endpoint << + " cannot accept incoming connections"; + } + else + { + m_journal.error << "Listening test for legacy address " << + endpoint << " failed: " << result.error.message(); + } + } + // Processes a list of Endpoint received from a peer. // void onPeerEndpoints (PeerID const& id, std::vector endpoints) @@ -424,17 +447,43 @@ public: m_callback.chargePeerLoadPenalty(id); } - + // process the list - + { + bool foundZeroHops (false); + bool chargedPenalty (false); + for (std::vector ::const_iterator iter (endpoints.begin()); + iter != endpoints.end(); ++iter) + { + Endpoint const& endpoint (*iter); + if (endpoint.hops == 0) + { + if (! foundZeroHops) + { + foundZeroHops = true; + m_checker.async_test (endpoint.address.withPort ( + endpoint.port), bind (&Logic::onCheckEndpoint, + this, id, _1)); + } + else if (! chargedPenalty) + { + // Only charge them once (?) + chargedPenalty = true; + // More than one zero-hops message?! + m_journal.warning << "Charging peer " << peer.address << + " for sending more than one hops==0 endpoint"; + m_callback.chargePeerLoadPenalty (id); + } + } + } + } + peer.whenReceiveEndpoints = now + secondsPerEndpoints; } else { m_journal.warning << "Charging peer " << peer.address << " for sending too quickly"; - - // Peer sent mtENDPOINTS too often m_callback.chargePeerLoadPenalty (id); } } @@ -446,28 +495,25 @@ public: void onPeerLegacyEndpoint (IPEndpoint const& ep) { - if (ep.isPublic()) + // filter invalid addresses + if (! ep.isPublic()) + return; + + if (ep.port() == 0) + return; + + std::pair result ( + m_legacyCache.insert (ep)); + + if (result.second) { - // insert into front container - std::pair result ( - m_legacyEndpoints->insert (ep)); + // its new + m_legacyCacheDirty = true; + m_journal.trace << "Legacy endpoint: " << ep; - // erase from back container if its new - if (result.second) - { - std::size_t const n (m_legacyEndpoints.back().erase (ep)); - if (n == 0) - { - m_legacyEndpointsDirty = true; - m_journal.trace << "Legacy endpoint: " << ep; - } - } - - if (m_legacyEndpoints->size() > (numberOfLegacyEndpoints/2)) - { - m_legacyEndpoints.swap(); - m_legacyEndpoints->clear(); - } + m_checker.async_test (ep, bind ( + &Logic::onCheckLegacyEndpoint, + this, ep, _1)); } } @@ -475,9 +521,10 @@ public: // void storeLegacyEndpoints () { - if (!m_legacyEndpointsDirty) + if (!m_legacyCacheDirty) return; +#if 0 std::vector list; createLegacyEndpointList (list); @@ -486,7 +533,8 @@ public: m_store.storeLegacyEndpoints (list); - m_legacyEndpointsDirty = false; + m_legacyCacheDirty = false; +#endif } }; diff --git a/src/ripple/peerfinder/impl/Manager.cpp b/src/ripple/peerfinder/impl/Manager.cpp index cde464e265..d7086f7abd 100644 --- a/src/ripple/peerfinder/impl/Manager.cpp +++ b/src/ripple/peerfinder/impl/Manager.cpp @@ -172,9 +172,6 @@ Revised Gnutella Ping Pong Scheme http://rfc-gnutella.sourceforge.net/src/pong-caching.html */ -#include "Logic.h" -#include "StoreSqdb.h" - namespace ripple { namespace PeerFinder { @@ -188,6 +185,7 @@ public: ServiceQueue m_queue; Journal m_journal; StoreSqdb m_store; + CheckerAdapter m_checker; Logic m_logic; DeadlineTimer m_connectTimer; DeadlineTimer m_endpointsTimer; @@ -200,11 +198,12 @@ public: , Thread ("PeerFinder") , m_journal (journal) , m_store (journal) - , m_logic (callback, m_store, journal) + , m_checker (m_queue) + , m_logic (callback, m_store, m_checker, journal) , m_connectTimer (this) , m_endpointsTimer (this) { -#if 0 +#if 1 #if BEAST_MSVC if (beast_isRunningUnderDebugger()) { @@ -281,6 +280,8 @@ public: void onStop () { + m_checker.cancel (); + if (this->Thread::isThreadRunning ()) { m_journal.debug << "Stopping"; diff --git a/src/ripple/peerfinder/impl/PeerInfo.h b/src/ripple/peerfinder/impl/PeerInfo.h index 84a55a19ce..ed1ef993df 100644 --- a/src/ripple/peerfinder/impl/PeerInfo.h +++ b/src/ripple/peerfinder/impl/PeerInfo.h @@ -20,12 +20,6 @@ #ifndef RIPPLE_PEERFINDER_PEERINFO_H_INCLUDED #define RIPPLE_PEERFINDER_PEERINFO_H_INCLUDED -#include "../../ripple/types/api/AgedHistory.h" - -#include "../api/Types.h" - -#include - namespace ripple { namespace PeerFinder { @@ -42,6 +36,8 @@ struct PeerInfo : id (id_) , address (address_) , inbound (inbound_) + , checked (inbound_ ? false : true) + , canAccept (inbound_ ? false : true) , whenSendEndpoints (RelativeTime::fromStartup()) , whenReceiveEndpoints (RelativeTime::fromStartup()) { @@ -51,6 +47,14 @@ struct PeerInfo IPEndpoint address; bool inbound; + // Tells us if we checked the connection. Outbound connections + // are always considered checked since we successfuly connected. + bool mutable checked; + + // Set to indicate if the connection can receive incoming at the + // address advertised in mtENDPOINTS. Only valid if checked is true + bool mutable canAccept; + // The time after which we will send the peer mtENDPOINTS RelativeTime mutable whenSendEndpoints; diff --git a/src/ripple/peerfinder/impl/Slots.h b/src/ripple/peerfinder/impl/Slots.h index 6c2b8edae0..d25f3e993b 100644 --- a/src/ripple/peerfinder/impl/Slots.h +++ b/src/ripple/peerfinder/impl/Slots.h @@ -20,8 +20,6 @@ #ifndef RIPPLE_PEERFINDER_SLOTS_H_INCLUDED #define RIPPLE_PEERFINDER_SLOTS_H_INCLUDED -#include "../api/Config.h" - namespace ripple { namespace PeerFinder { diff --git a/src/ripple/peerfinder/impl/SourceStrings.cpp b/src/ripple/peerfinder/impl/SourceStrings.cpp index 3712b97284..1735bf3a70 100644 --- a/src/ripple/peerfinder/impl/SourceStrings.cpp +++ b/src/ripple/peerfinder/impl/SourceStrings.cpp @@ -17,8 +17,6 @@ */ //============================================================================== -#include "SourceStrings.h" - namespace ripple { namespace PeerFinder { diff --git a/src/ripple/peerfinder/impl/SourceStrings.h b/src/ripple/peerfinder/impl/SourceStrings.h index 867d0faf97..c250433c0e 100644 --- a/src/ripple/peerfinder/impl/SourceStrings.h +++ b/src/ripple/peerfinder/impl/SourceStrings.h @@ -20,8 +20,6 @@ #ifndef RIPPLE_PEERFINDER_SOURCESTRINGS_H_INCLUDED #define RIPPLE_PEERFINDER_SOURCESTRINGS_H_INCLUDED -#include "Source.h" - namespace ripple { namespace PeerFinder { diff --git a/src/ripple/peerfinder/impl/StoreSqdb.h b/src/ripple/peerfinder/impl/StoreSqdb.h index 61057ce838..e0e88c29db 100644 --- a/src/ripple/peerfinder/impl/StoreSqdb.h +++ b/src/ripple/peerfinder/impl/StoreSqdb.h @@ -20,10 +20,6 @@ #ifndef RIPPLE_PEERFINDER_STORESQDB_H_INCLUDED #define RIPPLE_PEERFINDER_STORESQDB_H_INCLUDED -#include "beast/modules/beast_sqdb/beast_sqdb.h" - -#include "Store.h" - namespace ripple { namespace PeerFinder { diff --git a/src/ripple/peerfinder/impl/Tests.cpp b/src/ripple/peerfinder/impl/Tests.cpp index e84c6e909e..836d02bbce 100644 --- a/src/ripple/peerfinder/impl/Tests.cpp +++ b/src/ripple/peerfinder/impl/Tests.cpp @@ -17,8 +17,6 @@ */ //============================================================================== -#include "Logic.h" - namespace ripple { namespace PeerFinder { diff --git a/src/ripple/peerfinder/impl/Tuning.h b/src/ripple/peerfinder/impl/Tuning.h new file mode 100644 index 0000000000..4b5ba5abed --- /dev/null +++ b/src/ripple/peerfinder/impl/Tuning.h @@ -0,0 +1,51 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_PEERFINDER_TUNING_H_INCLUDED +#define RIPPLE_PEERFINDER_TUNING_H_INCLUDED + +namespace ripple { +namespace PeerFinder { + +// Tunable constants +enum +{ + // How often we will try to make outgoing connections + secondsPerConnect = 10, + + // How often we send or accept mtENDPOINTS messages per peer + secondsPerEndpoints = 5, + + // How many Endpoint to send in each mtENDPOINTS + numberOfEndpoints = 10, + + // The most Endpoint we will accept in mtENDPOINTS + numberOfEndpointsMax = 20, + + // How many legacy endpoints to keep in our cache + numberOfLegacyEndpoints = 1000, + + // How often legacy endpoints are updated in the database + legacyEndpointUpdateSeconds = 60 * 60 +}; + +} +} + +#endif diff --git a/src/ripple/peerfinder/ripple_peerfinder.cpp b/src/ripple/peerfinder/ripple_peerfinder.cpp index 4553f547fc..013568123b 100644 --- a/src/ripple/peerfinder/ripple_peerfinder.cpp +++ b/src/ripple/peerfinder/ripple_peerfinder.cpp @@ -17,21 +17,49 @@ */ //============================================================================== - #include "BeastConfig.h" #include "ripple_peerfinder.h" +#include "../../ripple/types/api/AgedHistory.h" + +#include + +#include "beast/modules/beast_core/system/BeforeBoost.h" +#include +#include +#include +#include +#include +#include + +#include "beast/modules/beast_sqdb/beast_sqdb.h" +#include "beast/modules/beast_asio/beast_asio.h" + namespace ripple { using namespace beast; } -# include "impl/Source.h" -# include "impl/SourceStrings.h" -#include "impl/SourceStrings.cpp" +# include "impl/Checker.h" +#include "impl/CheckerAdapter.h" +#include "impl/EndpointCache.h" +#include "impl/Slots.h" +#include "impl/Source.h" +#include "impl/SourceStrings.h" + +# include "impl/LegacyEndpoint.h" +# include "impl/LegacyEndpointCache.h" +# include "impl/PeerInfo.h" +# include "impl/Store.h" +# include "impl/Tuning.h" +#include "impl/StoreSqdb.h" +#include "impl/Logic.h" + +#include "impl/Checker.cpp" #include "impl/Config.cpp" #include "impl/Endpoint.cpp" #include "impl/EndpointCache.cpp" #include "impl/Manager.cpp" #include "impl/Slots.cpp" +#include "impl/SourceStrings.cpp" #include "impl/Tests.cpp" diff --git a/src/ripple/peerfinder/ripple_peerfinder.h b/src/ripple/peerfinder/ripple_peerfinder.h index 69985f0b42..90f2c89ad9 100644 --- a/src/ripple/peerfinder/ripple_peerfinder.h +++ b/src/ripple/peerfinder/ripple_peerfinder.h @@ -28,10 +28,10 @@ using namespace beast; #include "../types/api/RipplePublicKey.h" -#include "api/Types.h" -#include "api/Endpoint.h" -#include "api/Config.h" +# include "api/Endpoint.h" +# include "api/Types.h" #include "api/Callback.h" +#include "api/Config.h" #include "api/Manager.h" #endif diff --git a/src/ripple/types/api/CycledSet.h b/src/ripple/types/api/CycledSet.h new file mode 100644 index 0000000000..a8cadb8381 --- /dev/null +++ b/src/ripple/types/api/CycledSet.h @@ -0,0 +1,110 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_TYPES_CYCLEDSET_H_INCLUDED +#define RIPPLE_TYPES_CYCLEDSET_H_INCLUDED + +namespace ripple { + +/** Cycled set of unique keys. + This provides a system of remembering a set of keys, with aging. Two + containers are kept. When one container fills, the other is cleared + and a swap is performed. A key is considered present if it is in either + container. +*/ +template , + class Allocator = std::allocator > +class CycledSet +{ +private: + typedef boost::unordered_set< + Key, Hash, KeyEqual, Allocator> ContainerType; + typedef typename ContainerType::iterator iterator; + +public: + typedef typename ContainerType::key_type key_type; + typedef typename ContainerType::value_type value_type; + typedef typename ContainerType::size_type size_type; + typedef typename ContainerType::difference_type difference_type; + typedef typename ContainerType::hasher hasher; + typedef typename ContainerType::key_equal key_equal; + typedef typename ContainerType::allocator_type allocator_type; + typedef typename ContainerType::reference reference; + typedef typename ContainerType::const_reference const_reference; + typedef typename ContainerType::pointer pointer; + typedef typename ContainerType::const_pointer const_pointer; + + explicit CycledSet ( + size_type item_max, + Hash hash = Hash(), + KeyEqual equal = KeyEqual(), + Allocator alloc = Allocator()) + : m_max (item_max) + , m_hash (hash) + , m_equal (equal) + , m_alloc (alloc) + , m_front (m_max, hash, equal, alloc) + , m_back (m_max, hash, equal, alloc) + { + } + + // Returns `true` if the next real insert would swap + bool full() const + { + return m_front.size() >= m_max; + } + + // Adds the key to the front if its not in either map + bool insert (key_type const& key) + { + if (full()) + cycle (); + if (m_back.find (key) != m_back.end()) + return false; + std::pair result ( + m_front.insert (key)); + if (result.second) + return true; + return false; + } + + void cycle () + { + std::swap (m_front, m_back); + m_front.clear (); + +#if BOOST_VERSION > 105400 + m_front.reserve (m_max); +#endif + } + +private: + size_type m_max; + hasher m_hash; + key_equal m_equal; + allocator_type m_alloc; + ContainerType m_front; + ContainerType m_back; +}; + +} + +#endif diff --git a/src/ripple/types/ripple_types.h b/src/ripple/types/ripple_types.h index f2517bdb5d..a05bca3460 100644 --- a/src/ripple/types/ripple_types.h +++ b/src/ripple/types/ripple_types.h @@ -26,6 +26,7 @@ #include "beast/modules/beast_core/system/BeforeBoost.h" #include +#include // For ByteOrder #if BEAST_WIN32 @@ -43,6 +44,7 @@ using namespace beast; } #include "api/AgedHistory.h" +#include "api/CycledSet.h" # include "api/Blob.h" # include "api/Base58.h" # include "api/ByteOrder.h" diff --git a/src/ripple/validators/impl/Tuning.h b/src/ripple/validators/impl/Tuning.h index 372f4d60e7..4f1f6f9383 100644 --- a/src/ripple/validators/impl/Tuning.h +++ b/src/ripple/validators/impl/Tuning.h @@ -153,94 +153,6 @@ private: Info m_back_info; }; -//------------------------------------------------------------------------------ - -/** Cycled set of unique keys. */ -template , - class Allocator = std::allocator > -class CycledSet -{ -private: - typedef boost::unordered_set < - Key, Hash, KeyEqual, Allocator> ContainerType; - typedef typename ContainerType::iterator iterator; - -public: - typedef typename ContainerType::key_type key_type; - typedef typename ContainerType::value_type value_type; - typedef typename ContainerType::size_type size_type; - typedef typename ContainerType::difference_type difference_type; - typedef typename ContainerType::hasher hasher; - typedef typename ContainerType::key_equal key_equal; - typedef typename ContainerType::allocator_type allocator_type; - typedef typename ContainerType::reference reference; - typedef typename ContainerType::const_reference const_reference; - typedef typename ContainerType::pointer pointer; - typedef typename ContainerType::const_pointer const_pointer; - - explicit CycledSet ( - size_type item_max, - Hash hash = Hash(), - KeyEqual equal = KeyEqual(), - Allocator alloc = Allocator()) - : m_max (item_max) - , m_hash (hash) - , m_equal (equal) - , m_alloc (alloc) - , m_front (m_max, hash, equal, alloc) - , m_back (m_max, hash, equal, alloc) - { - } - - // Returns `true` if the next real insert would swap - bool full() const - { - return m_front.size() >= m_max; - } - - // Adds the key to the front if its not in either map - bool insert (key_type const& key) - { - if (full()) - cycle (); - if (m_back.find (key) != m_back.end()) - return false; - std::pair result ( - m_front.insert (key)); - if (result.second) - return true; - return false; - } - -#if 0 - bool find (key_type const& key) - { - if (m_front.find (key) != m_front.end()) - return true; - return m_back.find (key) != m_back.end(); - } -#endif - - void cycle () - { - std::swap (m_front, m_back); - m_front.clear (); -#if BOOST_VERSION > 105400 - m_front.reserve (m_max); -#endif - } - -private: - size_type m_max; - hasher m_hash; - key_equal m_equal; - allocator_type m_alloc; - ContainerType m_front; - ContainerType m_back; -}; - } }