diff --git a/Builds/VisualStudio2012/RippleD.vcxproj b/Builds/VisualStudio2012/RippleD.vcxproj index da7afca75..2c5223d37 100644 --- a/Builds/VisualStudio2012/RippleD.vcxproj +++ b/Builds/VisualStudio2012/RippleD.vcxproj @@ -102,6 +102,12 @@ true true + + true + true + true + true + true true @@ -1684,11 +1690,14 @@ + + + diff --git a/Builds/VisualStudio2012/RippleD.vcxproj.filters b/Builds/VisualStudio2012/RippleD.vcxproj.filters index fd977493e..4ad91a261 100644 --- a/Builds/VisualStudio2012/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2012/RippleD.vcxproj.filters @@ -1125,6 +1125,9 @@ [1] Ripple\resource\impl + + [1] Ripple\peerfinder\impl + @@ -2304,6 +2307,15 @@ [1] Ripple\peerfinder\impl + + [1] Ripple\peerfinder\impl + + + [1] Ripple\peerfinder\impl + + + [1] Ripple\peerfinder\impl + diff --git a/src/ripple/peerfinder/api/Manager.h b/src/ripple/peerfinder/api/Manager.h index 7beb3b8df..e955329e1 100644 --- a/src/ripple/peerfinder/api/Manager.h +++ b/src/ripple/peerfinder/api/Manager.h @@ -70,12 +70,30 @@ public: virtual void addFallbackURL (std::string const& name, std::string const& url) = 0; - /** Called when a new peer connection is established. + /** Called when an (outgoing) connection attempt to a particular address + is about to begin. + */ + virtual void onPeerConnectAttemptBegins (IPAddress const& address) = 0; + + /** Called when an (outgoing) connection attempt to a particular address + completes, whether it succeeds or fails. + */ + virtual void onPeerConnectAttemptCompletes (IPAddress const& address, + bool success) = 0; + + /** Called when a new peer connection is established but before get + we exchange hello messages. + */ + virtual void onPeerConnected (IPAddress const& address, + bool inbound) = 0; + + /** Called when a new peer connection is established after we exchange + hello messages. Internally, we add the peer to our tracking table, validate that we can connect to it, and begin advertising it to others after we are sure that its connection is stable. */ - virtual void onPeerConnected (PeerID const& id, + virtual void onPeerHandshake (PeerID const& id, IPAddress const& address, bool inbound) = 0; diff --git a/src/ripple/peerfinder/impl/Cache.h b/src/ripple/peerfinder/impl/Cache.h index bcfaec683..e8a815f5a 100644 --- a/src/ripple/peerfinder/impl/Cache.h +++ b/src/ripple/peerfinder/impl/Cache.h @@ -31,6 +31,9 @@ private: typedef boost::unordered_map < IPAddress, CachedEndpoint, IPAddress::hasher> Table; + typedef std::set < + IPAddress*, PtrCompareFunctor > AddressSet; + Journal m_journal; Table m_endpoints; @@ -39,9 +42,15 @@ private: // in oldest-to-newest order. The oldest item is at the head. List m_list; + // A set of IP addresses which we know about + AddressSet m_addresses; + + unsigned int m_generation; + public: explicit Cache (Journal journal) : m_journal (journal) + , m_generation(0) { } @@ -54,8 +63,7 @@ public: return m_endpoints.size(); } - // Cycle the tables - void cycle(DiscreteTime now) + void sweep (DiscreteTime now) { List ::iterator iter (m_list.begin()); @@ -82,10 +90,16 @@ public: std::pair result ( m_endpoints.emplace (message.address, CachedEndpoint(message, now))); - if (!result.second) - { // There was already an entry for this endpoint. Update it. - CachedEndpoint& entry (result.first->second); + CachedEndpoint& entry (result.first->second); + // We ignore messages that we receive at a higher hop count. We should + // consider having a counter that monotonically increases per reboot + // so that we can detect a server restart. + if (!result.second && (entry.message.hops > message.hops)) + return; + + if (!result.second) + { entry.message.hops = std::min (entry.message.hops, message.hops); // Copy the other fields based on uptime @@ -103,8 +117,6 @@ public: m_list.erase (m_list.iterator_to(entry)); } - CachedEndpoint& entry (result.first->second); - m_journal.debug << message.address << "valid " << entry.whenExpires << " (" << entry.message.incomingSlotsAvailable << @@ -113,8 +125,8 @@ public: m_list.push_back (entry); } - // Returns all the known endpoints we have, sorted by distance (that is, - // by hop). + // Get all known endpoints, sorted by distance (i.e. by hop). + // Giveaways getGiveawayList() { Giveaways giveaway; @@ -122,8 +134,7 @@ public: for (List ::iterator iter (m_list.begin()); iter != m_list.end(); iter++) { - if (iter->message.hops < maxPeerHopCount) - giveaway.add (*iter); + giveaway.add (*iter); } return giveaway; diff --git a/src/ripple/peerfinder/impl/Giveaways.h b/src/ripple/peerfinder/impl/Giveaways.h index 126f12f34..1ed4f6f11 100644 --- a/src/ripple/peerfinder/impl/Giveaways.h +++ b/src/ripple/peerfinder/impl/Giveaways.h @@ -33,6 +33,7 @@ class Giveaways public: typedef std::vector ::iterator iterator; + typedef std::vector ::reverse_iterator reverse_iterator; Giveaways() : m_hopVector(maxPeerHopCount) @@ -65,6 +66,8 @@ public: m_shuffled = true; } + // Provides an iterator that starts from hop 0 and goes all the way to + // the max hop. iterator begin () { return m_hopVector.begin(); @@ -74,6 +77,18 @@ public: { return m_hopVector.end(); } + + // Provides an iterator that starts from the max hop and goes all the way + // down to hop 0. + reverse_iterator rbegin () + { + return m_hopVector.rbegin(); + } + + reverse_iterator rend () + { + return m_hopVector.rend(); + } }; } diff --git a/src/ripple/peerfinder/impl/GiveawaysAtHop.h b/src/ripple/peerfinder/impl/GiveawaysAtHop.h index b904c7082..269136448 100644 --- a/src/ripple/peerfinder/impl/GiveawaysAtHop.h +++ b/src/ripple/peerfinder/impl/GiveawaysAtHop.h @@ -47,13 +47,10 @@ public: // that we will be returning. void add (CachedEndpoint &endpoint) { - if (endpoint.message.hops < maxPeerHopCount) - { - if (endpoint.color) - m_list.push_back(&endpoint); - else - m_used.push_back(&endpoint); - } + if (endpoint.color) + m_list.push_back(&endpoint); + else + m_used.push_back(&endpoint); } // Shuffles the list of peers we are about to hand out. @@ -63,7 +60,7 @@ public: } // Prepare to begin iterating over the entire set of peers again. - void reset () + bool reset () { // We need to add any entries from the stale vector in the tail // end of the fresh vector. We do not need to shuffle them. @@ -73,8 +70,19 @@ public: m_used.clear(); } - // We need to start from the beginning again. + // And start iterating the list from the beginning. m_position = m_list.begin(); + + // Return whether there is anything in this vector to iterate. + return !empty(); + } + + // Determines if we have any giveaways at the current hop could; if we + // do not you should not dereference the iterator returned from "begin" or + // "rbegin" + bool empty() const + { + return m_list.empty(); } // This is somewhat counterintuitive, but it doesn't really "begin" diff --git a/src/ripple/peerfinder/impl/Logic.h b/src/ripple/peerfinder/impl/Logic.h index d35665e02..0bb2ee219 100644 --- a/src/ripple/peerfinder/impl/Logic.h +++ b/src/ripple/peerfinder/impl/Logic.h @@ -55,7 +55,21 @@ typedef boost::multi_index_container < */ class Logic { +private: + typedef std::set < IPAddress > IPAddressSet; + public: + template < class T, class C = std::less > + struct PtrComparator + { + bool operator()(const T *x, const T *y) const + { + C comp; + + return comp(*x, *y); + } + }; + struct State { State () @@ -102,6 +116,9 @@ public: LegacyEndpointCache m_legacyCache; + // Our set of connection attempts currently in-progress + IPAddressSet m_attemptsInProgress; + //-------------------------------------------------------------------------- Logic ( @@ -201,6 +218,40 @@ public: return true; } + // Return endpoints to which we want to try to make outgoing connections. + // We preferentially return endpoints which are far away from as to try to + // improve the algebraic connectivity of the network graph. For more see + // http://en.wikipedia.org/wiki/Algebraic_connectivity + // + void getNewOutboundEndpoints (int needed, std::vector & list) + { + Giveaways giveaway (m_cache.getGiveawayList()); + int count = 0; + + for (Giveaways::reverse_iterator iter (giveaway.rbegin()); + iter != giveaway.rend(); ++iter) + { + // Check whether we have anything at the current hop level. + iter->reset (); + + for(GiveawaysAtHop::iterator iter2 (iter->begin()); + iter2 != iter->end() && (count != needed); ++iter2) + { + CachedEndpoint *ep (*iter2); + + // NIKB TODO we need to check whether this peer is already + // connected prior to just returning it and wasting time + // trying to establish a redundant connection. + + if(ep->message.incomingSlotsAvailable != 0) + { + list.push_back(ep->message.address); + ++count; + } + } + } + } + // If configured to make outgoing connections, do us in order // to bring us up to desired out count. // @@ -215,7 +266,8 @@ public: int const needed (std::min ( m_slots.outDesired - m_slots.outboundCount, int (maxAddressesPerAttempt))); - m_legacyCache.get (needed, list, get_now()); + + getNewOutboundEndpoints (needed, list); } } @@ -288,29 +340,69 @@ public: m_sources.push_back (source); } - // Called periodically to cycle and age the various caches. + // Called periodically to sweep the cache and remove aged out items. // - void cycleCache() + void sweepCache () { - m_cache.cycle (get_now()); + m_cache.sweep (get_now()); for (Peers::iterator iter (m_peers.begin()); iter != m_peers.end(); ++iter) iter->received.cycle(); } - void onPeerConnecting () + // Called when an outbound connection attempt is started + // + void onPeerConnectAttemptBegins (IPAddress const& address) { + std::pair ret = + m_attemptsInProgress.insert (address); + + // We are always notified of connection attempts so if we think that + // something was in progress and a connection attempt begins then + // something is very wrong. + + bassert (ret.second); + + if (ret.second) + m_journal.debug << "Attempt for " << address << " is in progress"; + else + m_journal.error << "Attempt for " << address << " was already in progress"; + } + + // Called when an outbound connection attempt completes + // + void onPeerConnectAttemptCompletes (IPAddress const& address, bool success) + { + IPAddressSet::size_type ret = m_attemptsInProgress.erase (address); + + bassert (ret == 1); + + if (ret == 1) + m_journal.debug << "Attempt for " << address << + " completed: " << (success ? "success" : "failure"); + else + m_journal.error << "Attempt for untracked " << address << + " completed: " << (success ? "success" : "failure"); + } + + // Called when a peer connection is established but before a handshake + // occurs. + void onPeerConnected (IPAddress const& address, bool incoming) + { + m_journal.error << "Connected: " << address << + (incoming ? " (incoming)" : " (outgoing)"); } // Called when a peer connection is established. // We are guaranteed that the PeerID is not already in our map. // but we are *NOT* guaranteed that the IP isn't. So we need // to be careful. - void onPeerConnected (PeerID const& id, + void onPeerHandshake (PeerID const& id, IPAddress const& address, bool inbound) { - m_journal.debug << "Peer connected: " << address; + m_journal.debug << "Handshake: " << address; + // If this is outgoing, record the success if (! inbound) m_legacyCache.checked (address, true); @@ -332,14 +424,14 @@ public: // Called when a peer is disconnected. // We are guaranteed to get this exactly once for each - // corresponding call to onPeerConnected. + // corresponding call to onPeerHandshake. // void onPeerDisconnected (PeerID const& id) { Peers::iterator iter (m_peers.find (id)); bassert (iter != m_peers.end()); PeerInfo const& peer (*iter); - m_journal.debug << "Peer disconnected: " << peer.address; + m_journal.debug << "Disconnected: " << peer.address; m_slots.dropPeer (m_config, peer.inbound); // VFALCO NOTE Update fixed peers count (HACKED) @@ -418,11 +510,12 @@ public: while(iter2 != iter->end()) { - // FIXME NIKB check if the peer wants to receive this endpoint - // and add it to the list of endpoints we will send if he does. + // FIXME NIKB check if the peer wants to receive this + // endpoint and add it to the list of endpoints we will + // send if he does. if(false) - iter->erase(iter2); + iter2 = iter->erase(iter2); else ++iter2; } @@ -439,11 +532,11 @@ public: { if (! m_peers.empty()) { - m_journal.trace << "Sending endpoints to our peers"; + m_journal.trace << "Sending endpoints..."; DiscreteTime const now (get_now()); - // fill in endpoints + // fill in endpoints. Giveaways giveaway(m_cache.getGiveawayList()); for (Peers::iterator iter (m_peers.begin()); @@ -486,10 +579,10 @@ public: peer.canAccept = result.canAccept; if (peer.canAccept) - m_journal.info << "Peer " << peer.address << + m_journal.info << peer.address << " passed listening test"; else - m_journal.warning << "Peer " << peer.address << + m_journal.warning << peer.address << " cannot accept incoming connections"; } else @@ -533,13 +626,13 @@ public: // numberOfEndpoints peers in a single message if (list.size() > numberOfEndpoints) { - m_journal.warning << "Charging peer " << peer.address << + m_journal.warning << "Charging " << peer.address << " for sending too many endpoints"; m_callback.chargePeerLoadPenalty(id); } - m_journal.debug << "Peer " << peer.address << + m_journal.debug << peer.address << " sent us " << list.size() << " endpoints."; // Process each entry @@ -553,7 +646,7 @@ public: // Remember that this peer gave us this address peer.received.insert (message.address); - m_journal.debug << "Received peer " << message.address << + m_journal.debug << message.address << " at " << message.hops << " hops."; if (message.hops == 0) @@ -601,7 +694,7 @@ public: if (neighborCount > 1) { - m_journal.warning << "Peer " << peer.address << + m_journal.warning << peer.address << " sent " << neighborCount << " entries with hops=0"; // VFALCO TODO Should we apply load charges? } diff --git a/src/ripple/peerfinder/impl/Manager.cpp b/src/ripple/peerfinder/impl/Manager.cpp index fb72c49d5..b48b97526 100644 --- a/src/ripple/peerfinder/impl/Manager.cpp +++ b/src/ripple/peerfinder/impl/Manager.cpp @@ -299,19 +299,36 @@ public: // VFALCO TODO This needs to be implemented } - void onPeerConnecting () + void onPeerConnectAttemptBegins (IPAddress const& address) { m_queue.dispatch ( m_context.wrap ( - bind (&Logic::onPeerConnecting, &m_logic))); + bind (&Logic::onPeerConnectAttemptBegins, &m_logic, + address))); } - void onPeerConnected (PeerID const& id, - IPAddress const& address, bool incoming) + void onPeerConnectAttemptCompletes (IPAddress const& address, bool success) + { + m_queue.dispatch ( + m_context.wrap ( + bind (&Logic::onPeerConnectAttemptCompletes, &m_logic, + address, success))); + } + + void onPeerConnected (const IPAddress &address, bool incoming) { m_queue.dispatch ( m_context.wrap ( bind (&Logic::onPeerConnected, &m_logic, + address, incoming))); + } + + void onPeerHandshake (PeerID const& id, + IPAddress const& address, bool incoming) + { + m_queue.dispatch ( + m_context.wrap ( + bind (&Logic::onPeerHandshake, &m_logic, id, address, incoming))); } @@ -402,7 +419,7 @@ public: { m_queue.dispatch ( m_context.wrap ( - bind (&Logic::cycleCache, &m_logic))); + bind (&Logic::sweepCache, &m_logic))); m_cacheTimer.setExpiration (cacheSecondsToLive); } diff --git a/src/ripple/peerfinder/impl/PeerInfo.h b/src/ripple/peerfinder/impl/PeerInfo.h index 14bf02983..5ceadbdb9 100644 --- a/src/ripple/peerfinder/impl/PeerInfo.h +++ b/src/ripple/peerfinder/impl/PeerInfo.h @@ -30,6 +30,24 @@ namespace PeerFinder { // we keep one of these for each connected peer struct PeerInfo { + enum State + { + // Some peculiar, unknown state + stateUnknown, + + // A connection attempt is in progress + stateConnecting, + + // A connection has been established but no handshake yet + stateConnected, + + // A connection has been established and the handshake has completed + stateEstablished, + + // A connection (of some kind) that is being torn down + stateDisconnecting + }; + PeerInfo (PeerID const& id_, IPAddress const& address_, bool inbound_, @@ -37,9 +55,11 @@ struct PeerInfo : id (id_) , address (address_) , inbound (inbound_) + , fixed (false) , checked (inbound_ ? false : true) , canAccept (inbound_ ? false : true) , connectivityCheckInProgress (false) + , peerState (stateUnknown) , whenSendEndpoints (now) , whenAcceptEndpoints (now) { @@ -49,6 +69,9 @@ struct PeerInfo IPAddress address; bool inbound; + // Set to indicate that this is a fixed peer. + bool fixed; + // Tells us if we checked the connection. Outbound connections // are always considered checked since we successfuly connected. bool mutable checked; @@ -61,6 +84,9 @@ struct PeerInfo // progress. Valid always. bool mutable connectivityCheckInProgress; + // Indicates the state for this peer + State peerState; + // The time after which we will send the peer mtENDPOINTS DiscreteTime mutable whenSendEndpoints; diff --git a/src/ripple/peerfinder/impl/PtrCompareFunctor.h b/src/ripple/peerfinder/impl/PtrCompareFunctor.h new file mode 100644 index 000000000..2b9be743f --- /dev/null +++ b/src/ripple/peerfinder/impl/PtrCompareFunctor.h @@ -0,0 +1,46 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_PEERFINDER_PTRCOMPAREFUNC_H_INCLUDED +#define RIPPLE_PEERFINDER_PTRCOMPAREFUNC_H_INCLUDED + +namespace ripple { +namespace PeerFinder { + +//------------------------------------------------------------------------------ + +/** Compare two instances of a class of type T using the comparator specified + by class C via pointers. This does not compare the pointers themselves but + what the pointers point to. +*/ +template < class T, class C = std::less > +struct PtrCompareFunctor +{ + bool operator()(T const *lhs, T const *rhs) const + { + C comp; + + return comp(*lhs, *rhs); + } +}; + +} +} + +#endif diff --git a/src/ripple/peerfinder/impl/Resolver.cpp b/src/ripple/peerfinder/impl/Resolver.cpp new file mode 100644 index 000000000..5c9bf3725 --- /dev/null +++ b/src/ripple/peerfinder/impl/Resolver.cpp @@ -0,0 +1,185 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#if 0 +namespace ripple { +namespace PeerFinder { + +class ResolverImp + : public Resolver + , private Thread + , private LeakChecked +{ +private: + class Request; + + struct State + { + List list; + }; + + typedef SharedData SharedState; + + SharedState m_state; + boost::asio::io_service m_io_service; + boost::optional m_work; + + //-------------------------------------------------------------------------- + + static boost::asio::ip::tcp::endpoint fromIPAddress ( + IPAddress const& ipEndpoint) + { + if (ipEndpoint.isV4 ()) + { + return boost::asio::ip::tcp::endpoint ( + boost::asio::ip::address_v4 ( + ipEndpoint.v4().value), + ipEndpoint.port ()); + } + bassertfalse; + return boost::asio::ip::tcp::endpoint (); + } + + //-------------------------------------------------------------------------- + + class Request + : public SharedObject + , public List ::Node + , private LeakChecked + { + public: + typedef SharedPtr Ptr; + typedef boost::asio::ip::tcp Protocol; + typedef boost::system::error_code error_code; + typedef Protocol::socket socket_type; + typedef Protocol::endpoint endpoint_type; + + ResolverImp& m_owner; + boost::asio::io_service& m_io_service; + IPAddress m_address; + AbstractHandler m_handler; + socket_type m_socket; + boost::system::error_code m_error; + bool m_canAccept; + + Request (ResolverImp& owner, boost::asio::io_service& io_service, + IPAddress const& address, AbstractHandler handler) + : m_owner (owner) + , m_io_service (io_service) + , m_address (address) + , m_handler (handler) + , m_socket (m_io_service) + , m_canAccept (false) + { + m_owner.add (*this); + + m_socket.async_connect (fromIPAddress (m_address), + wrapHandler (boost::bind (&Request::handle_connect, Ptr(this), + boost::asio::placeholders::error), m_handler)); + } + + ~Request () + { + Result result; + result.address = m_address; + result.error = m_error; + m_io_service.wrap (m_handler) (result); + + m_owner.remove (*this); + } + + void cancel () + { + m_socket.cancel(); + } + + void handle_connect (boost::system::error_code ec) + { + m_error = ec; + if (ec) + return; + + m_canAccept = true; + } + }; + + //-------------------------------------------------------------------------- + + void add (Request& request) + { + SharedState::Access state (m_state); + state->list.push_back (request); + } + + void remove (Request& request) + { + SharedState::Access state (m_state); + state->list.erase (state->list.iterator_to (request)); + } + + void run () + { + m_io_service.run (); + } + +public: + ResolverImp () + : Thread ("PeerFinder::Resolver") + , m_work (boost::in_place (boost::ref (m_io_service))) + { + startThread (); + } + + ~ResolverImp () + { + // cancel pending i/o + cancel(); + + // destroy the io_service::work object + m_work = boost::none; + + // signal and wait for the thread to exit gracefully + stopThread (); + } + + void cancel () + { + SharedState::Access state (m_state); + for (List ::iterator iter (state->list.begin()); + iter != state->list.end(); ++iter) + iter->cancel(); + } + + void async_test (IPAddress const& endpoint, + AbstractHandler handler) + { + new Request (*this, m_io_service, endpoint, handler); + } +}; + +//------------------------------------------------------------------------------ + +Resolver* Resolver::New () +{ + return new ResolverImp; +} + +} +} +#endif diff --git a/src/ripple/peerfinder/impl/Resolver.h b/src/ripple/peerfinder/impl/Resolver.h new file mode 100644 index 000000000..05c184103 --- /dev/null +++ b/src/ripple/peerfinder/impl/Resolver.h @@ -0,0 +1,88 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_PEERFINDER_RESOLVER_H_INCLUDED +#define RIPPLE_PEERFINDER_RESOLVER_H_INCLUDED + +namespace ripple { +namespace PeerFinder { + +/** Performs asynchronous domain name resolution. */ +class Resolver +{ +public: + /** Create the service. + This will automatically start the associated thread and io_service. + */ + static Resolver* New (); + + /** Destroy the service. + Any pending I/O operations will be canceled. This call blocks until + all pending operations complete (either with success or with + operation_aborted) and the associated thread and io_service have + no more work remaining. + */ + virtual ~Resolver () { } + + /** Cancel pending I/O. + This issues cancel orders for all pending I/O operations and then + returns immediately. Handlers will receive operation_aborted errors, + or if they were already queued they will complete normally. + */ + virtual void cancel () = 0; + + struct Result + { + Result () + { } + + /** The original name string */ + std::string name; + + /** The error code from the operation. */ + boost::system::error_code error; + + /** The resolved address. + Only defined if there is no error. + If the original name string contains a port specification, + it will be set in the resolved IPAddress. + */ + IPAddress address; + }; + + /** Performs an async resolution on the specified name. + The port information, if present, will be passed through. + */ + template + void async_resolve (std::string const& name, + BEAST_MOVE_ARG(Handler) handler) + { + async_resolve (name, + AbstractHandler ( + BEAST_MOVE_CAST(Handler)(handler))); + } + + virtual void async_resolve (std::string const& name, + AbstractHandler handler) = 0; +}; + +} +} + +#endif diff --git a/src/ripple/peerfinder/ripple_peerfinder.cpp b/src/ripple/peerfinder/ripple_peerfinder.cpp index 09c18e85d..e255fd1e9 100644 --- a/src/ripple/peerfinder/ripple_peerfinder.cpp +++ b/src/ripple/peerfinder/ripple_peerfinder.cpp @@ -46,10 +46,12 @@ using namespace beast; #include "impl/PrivateTypes.h" # include "impl/Tuning.h" # include "impl/Checker.h" +# include "impl/Resolver.h" #include "impl/CheckerAdapter.h" # include "impl/CachedEndpoint.h" # include "impl/GiveawaysAtHop.h" # include "impl/Giveaways.h" +#include "impl/PtrCompareFunctor.h" #include "impl/Cache.h" #include "impl/Slots.h" #include "impl/Source.h" @@ -67,6 +69,7 @@ using namespace beast; #include "impl/Endpoint.cpp" #include "impl/Cache.cpp" #include "impl/Manager.cpp" +#include "impl/Resolver.cpp" #include "impl/Slots.cpp" #include "impl/SourceStrings.cpp" #include "impl/Tests.cpp" diff --git a/src/ripple_app/peers/Peer.cpp b/src/ripple_app/peers/Peer.cpp index 9691e8d40..f5b524bd3 100644 --- a/src/ripple_app/peers/Peer.cpp +++ b/src/ripple_app/peers/Peer.cpp @@ -332,6 +332,8 @@ private: else m_usage = m_resourceManager.newOutboundEndpoint (m_remoteAddress); + getApp ().getPeers ().peerConnected(m_remoteAddress, m_isInbound); + // Must compute mCookieHash before receiving a hello. sendHello (); startReadHeader (); @@ -378,6 +380,8 @@ private: else m_usage = m_resourceManager.newOutboundEndpoint (m_remoteAddress); + getApp ().getPeers ().peerConnected(m_remoteAddress, m_isInbound); + // Must compute mCookieHash before receiving a hello. sendHello (); startReadHeader (); @@ -624,7 +628,13 @@ void PeerImp::connect (const std::string& strIp, int iPort) if (!err) { - WriteLog (lsINFO, Peer) << "Peer: Connect: Outbound: " << addressToString (this) << ": " << mIpPort.first << " " << mIpPort.second; + WriteLog (lsINFO, Peer) << "Peer: Connect: Outbound: " << + addressToString (this) << ": " << + mIpPort.first << " " << mIpPort.second; + + // Notify peer finder that we have a connection attempt in-progress + getApp ().getPeers ().getPeerFinder ().onPeerConnectAttemptBegins( + IPAddress::from_string(strIp).withPort(iPortAct) ); boost::asio::async_connect ( getNativeSocket (), @@ -640,14 +650,23 @@ void PeerImp::connect (const std::string& strIp, int iPort) // Connect ssl as client. void PeerImp::handleConnect (const boost::system::error_code& error, boost::asio::ip::tcp::resolver::iterator it) { + // Notify peer finder about the status of this in-progress connection attempt + getApp ().getPeers ().getPeerFinder ().onPeerConnectAttemptCompletes( + IPAddress::from_string(getIP()).withPort(getPort()), !error ); + if (error) { - WriteLog (lsINFO, Peer) << "Peer: Connect: Error: " << error.category ().name () << ": " << error.message () << ": " << error; + WriteLog (lsINFO, Peer) << "Peer: Connect: Error: " << + getIP() << ":" << getPort() << + " (" << error.category ().name () << + ": " << error.message () << + ": " << error << ")"; detach ("hc", true); } else { - WriteLog (lsINFO, Peer) << "Connect peer: success."; + WriteLog (lsINFO, Peer) << "Peer: Connect: Success: " << + getIP() << ":" << getPort(); getHandshakeStream ().set_verify_mode (boost::asio::ssl::verify_none); @@ -1158,7 +1177,7 @@ void PeerImp::recvHello (protocol::TMHello& packet) getApp().getPeers ().peerVerified (shared_from_this ()); } - if (! getApp().getPeers ().peerConnected (shared_from_this (), mNodePublic, getIP (), getPort ())) + if (! getApp().getPeers ().peerHandshake (shared_from_this (), mNodePublic, getIP (), getPort ())) { // Already connected, self, or some other reason. WriteLog (lsINFO, Peer) << "Recv(Hello): Disconnect: Extraneous connection."; diff --git a/src/ripple_app/peers/Peers.cpp b/src/ripple_app/peers/Peers.cpp index cafd063b4..c59f0dc4b 100644 --- a/src/ripple_app/peers/Peers.cpp +++ b/src/ripple_app/peers/Peers.cpp @@ -116,9 +116,7 @@ public: { PeerFinder::Config config; -#if RIPPLE_USE_PEERFINDER config.maxPeerCount = getConfig ().PEERS_MAX; -#endif config.wantIncoming = (! getConfig ().PEER_PRIVATE) && @@ -271,9 +269,18 @@ public: bool getTopNAddrs (int n, std::vector& addrs); bool savePeer (const std::string& strIp, int iPort, char code); + // disconnect the specified peer + void disconnectPeer (PeerFinder::PeerID const &id, bool graceful) + { + // NIKB TODO + } + + // A peer connected but we only have the IP address so far. + void peerConnected (const IPAddress& address, bool incoming); + // We know peers node public key. // <-- bool: false=reject - bool peerConnected (Peer::ref peer, const RippleAddress& naPeer, const std::string& strIP, int iPort); + bool peerHandshake (Peer::ref peer, const RippleAddress& naPeer, const std::string& strIP, int iPort); // No longer connected. void peerDisconnected (Peer::ref peer, const RippleAddress& naPeer); @@ -641,7 +648,7 @@ void PeersImp::connectTo (const std::string& strIp, int iPort) // <-- true, if already connected. Peer::pointer PeersImp::peerConnect (const std::string& strIp, int iPort) { - IPAndPortNumber pipPeer = make_pair (strIp, iPort); + IPAndPortNumber pipPeer = make_pair (strIp, iPort); Peer::pointer ppResult; { @@ -716,9 +723,14 @@ uint64 PeersImp::assignPeerId () return ++mLastPeer; } +void PeersImp::peerConnected (const IPAddress& address, bool incoming) +{ + getPeerFinder ().onPeerConnected (address, incoming); +} + // Now know peer's node public key. Determine if we want to stay connected. // <-- bNew: false = redundant -bool PeersImp::peerConnected (Peer::ref peer, const RippleAddress& naPeer, +bool PeersImp::peerHandshake (Peer::ref peer, const RippleAddress& naPeer, const std::string& strIP, int iPort) { bool bNew = false; @@ -745,7 +757,7 @@ bool PeersImp::peerConnected (Peer::ref peer, const RippleAddress& naPeer, // Notify peerfinder since this is a connection that we didn't // know about and are keeping // - getPeerFinder ().onPeerConnected (RipplePublicKey ( + getPeerFinder ().onPeerHandshake (RipplePublicKey ( peer->getNodePublic()), peer->getPeerEndpoint(), peer->isInbound()); diff --git a/src/ripple_app/peers/Peers.h b/src/ripple_app/peers/Peers.h index 76d5f0012..ac02f035a 100644 --- a/src/ripple_app/peers/Peers.h +++ b/src/ripple_app/peers/Peers.h @@ -62,9 +62,13 @@ public: virtual bool getTopNAddrs (int n, std::vector& addrs) = 0; virtual bool savePeer (const std::string& strIp, int iPort, char code) = 0; + // A peer connection has been established, but we know nothing about it at + // this point beyond the IP address. + virtual void peerConnected (const IPAddress& address, bool incoming) = 0; + // We know peers node public key. // <-- bool: false=reject - virtual bool peerConnected (Peer::ref peer, const RippleAddress& naPeer, const std::string& strIP, int iPort) = 0; + virtual bool peerHandshake (Peer::ref peer, const RippleAddress& naPeer, const std::string& strIP, int iPort) = 0; // No longer connected. virtual void peerDisconnected (Peer::ref peer, const RippleAddress& naPeer) = 0;