From 7c0c2419f7726e759d4e9b8d6d61ae58e8bc3d94 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Tue, 7 Oct 2014 18:00:14 -0700 Subject: [PATCH] Refactor PeerFinder: Previously, the PeerFinder manager constructed with a Callback object provided by the owner which was used to perform operations like connecting, disconnecting, and sending messages. This made it difficult to change the overlay code because a single call into the PeerFinder could cause both OverlayImpl and PeerImp to be re-entered one or more times, sometimes while holding a recursive mutex. This change eliminates the callback by changing PeerFinder functions to return values indicating the action the caller should take. As a result of this change the PeerFinder no longer needs its own dedicated thread. OverlayImpl is changed to call into PeerFinder on a timer to perform periodic activities. Furthermore the Checker class used to perform connectivity checks has been refactored. It no longer uses an abstract base class, in order to not type-erase the handler passed to async_connect (ensuring compatibility with coroutines). To allow unit tests that don't need a network, the Logic class is now templated on the Checker type. Currently the Manager provides its own io_service. However, this can easily be changed so that the io_service is provided upon construction. Summary * Remove unused SiteFiles dependency injection * Remove Callback and update signatures for public APIs * Remove obsolete functions * Move timer to overlay * Steps toward a shared io_service * Templated, simplified Checker * Tidy up Checker declaration --- Builds/VisualStudio2013/RippleD.vcxproj | 5 - .../VisualStudio2013/RippleD.vcxproj.filters | 6 - src/ripple/overlay/impl/OverlayImpl.cpp | 156 ++-- src/ripple/overlay/impl/OverlayImpl.h | 43 +- src/ripple/overlay/impl/PeerImp.cpp | 118 +-- src/ripple/overlay/impl/PeerImp.h | 48 +- src/ripple/peerfinder/Manager.h | 91 ++- src/ripple/peerfinder/impl/Checker.cpp | 181 ----- src/ripple/peerfinder/impl/Checker.h | 209 +++++- src/ripple/peerfinder/impl/CheckerAdapter.h | 105 --- src/ripple/peerfinder/impl/Handouts.h | 5 + src/ripple/peerfinder/impl/Logic.h | 700 ++++++++---------- src/ripple/peerfinder/impl/Manager.cpp | 262 +++---- src/ripple/peerfinder/impl/Source.h | 1 + src/ripple/peerfinder/impl/StoreSqdb.h | 1 + src/ripple/peerfinder/sim/Tests.cpp | 2 +- src/ripple/sitefiles/api/Manager.h | 2 +- src/ripple/unity/peerfinder.cpp | 1 - 18 files changed, 816 insertions(+), 1120 deletions(-) delete mode 100644 src/ripple/peerfinder/impl/Checker.cpp delete mode 100644 src/ripple/peerfinder/impl/CheckerAdapter.h diff --git a/Builds/VisualStudio2013/RippleD.vcxproj b/Builds/VisualStudio2013/RippleD.vcxproj index a64f40de29..0145acd94f 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj +++ b/Builds/VisualStudio2013/RippleD.vcxproj @@ -2837,13 +2837,8 @@ - - True - - - True diff --git a/Builds/VisualStudio2013/RippleD.vcxproj.filters b/Builds/VisualStudio2013/RippleD.vcxproj.filters index 00138232b0..73e1283677 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2013/RippleD.vcxproj.filters @@ -3984,15 +3984,9 @@ ripple\peerfinder\impl - - ripple\peerfinder\impl - ripple\peerfinder\impl - - ripple\peerfinder\impl - ripple\peerfinder\impl diff --git a/src/ripple/overlay/impl/OverlayImpl.cpp b/src/ripple/overlay/impl/OverlayImpl.cpp index d8bc765322..1feb958d24 100644 --- a/src/ripple/overlay/impl/OverlayImpl.cpp +++ b/src/ripple/overlay/impl/OverlayImpl.cpp @@ -20,7 +20,6 @@ #include #include #include - #include #if DOXYGEN @@ -63,15 +62,12 @@ OverlayImpl::OverlayImpl (Stoppable& parent, , m_child_count (1) , m_journal (deprecatedLogs().journal("Overlay")) , m_resourceManager (resourceManager) - , m_peerFinder (add (PeerFinder::Manager::New ( - *this, - siteFiles, - pathToDbFileOrDirectory, - *this, - get_seconds_clock (), - deprecatedLogs().journal("PeerFinder")))) + , m_peerFinder (add (PeerFinder::Manager::New (*this, + pathToDbFileOrDirectory, get_seconds_clock (), + deprecatedLogs().journal("PeerFinder")))) , m_io_service (io_service) , m_ssl_context (ssl_context) + , timer_(io_service) , m_resolver (resolver) , m_nextShortId (0) { @@ -222,89 +218,6 @@ OverlayImpl::remove (PeerFinder::Slot::ptr const& slot) release(); } -//-------------------------------------------------------------------------- -// -// PeerFinder::Callback -// -//-------------------------------------------------------------------------- - -void -OverlayImpl::connect (std::vector const& list) -{ - for (std::vector ::const_iterator iter (list.begin()); - iter != list.end(); ++iter) - connect (*iter); -} - -void -OverlayImpl::activate (PeerFinder::Slot::ptr const& slot) -{ - m_journal.trace << - "Activate " << slot->remote_endpoint(); - - std::lock_guard lock (m_mutex); - - PeersBySlot::iterator const iter (m_peers.find (slot)); - assert (iter != m_peers.end ()); - PeerImp::ptr const peer (iter->second.lock()); - assert (peer != nullptr); - peer->activate (); -} - -void -OverlayImpl::send (PeerFinder::Slot::ptr const& slot, - std::vector const& endpoints) -{ - typedef std::vector List; - protocol::TMEndpoints tm; - for (List::const_iterator iter (endpoints.begin()); - iter != endpoints.end(); ++iter) - { - PeerFinder::Endpoint const& ep (*iter); - protocol::TMEndpoint& tme (*tm.add_endpoints()); - if (ep.address.is_v4()) - tme.mutable_ipv4()->set_ipv4( - beast::toNetworkByteOrder (ep.address.to_v4().value)); - else - tme.mutable_ipv4()->set_ipv4(0); - tme.mutable_ipv4()->set_ipv4port (ep.address.port()); - - tme.set_hops (ep.hops); - } - - tm.set_version (1); - - Message::pointer msg ( - std::make_shared ( - tm, protocol::mtENDPOINTS)); - - { - std::lock_guard lock (m_mutex); - PeersBySlot::iterator const iter (m_peers.find (slot)); - assert (iter != m_peers.end ()); - PeerImp::ptr const peer (iter->second.lock()); - assert (peer != nullptr); - peer->send (msg); - } -} - -void -OverlayImpl::disconnect (PeerFinder::Slot::ptr const& slot, bool graceful) -{ - if (m_journal.trace) m_journal.trace << - "Disconnect " << slot->remote_endpoint () << - (graceful ? " gracefully" : ""); - - std::lock_guard lock (m_mutex); - - PeersBySlot::iterator const iter (m_peers.find (slot)); - assert (iter != m_peers.end ()); - PeerImp::ptr const peer (iter->second.lock()); - assert (peer != nullptr); - peer->close (graceful); - //peer->detach ("disc", false); -} - //-------------------------------------------------------------------------- // // Stoppable @@ -405,6 +318,10 @@ OverlayImpl::onPrepare () void OverlayImpl::onStart () { + // mutex not needed since we aren't running + ++m_child_count; + boost::asio::spawn (m_io_service, std::bind ( + &OverlayImpl::do_timer, this, std::placeholders::_1)); } /** Close all peer connections. @@ -423,13 +340,16 @@ OverlayImpl::close_all (bool graceful) // ~PeerImp is pre-empted before it calls m_peers.remove() // if (peer != nullptr) - peer->close (graceful); + peer->close(); } } void OverlayImpl::onStop () { + error_code ec; + timer_.cancel(ec); + if (m_doorDirect) m_doorDirect->stop(); if (m_doorProxy) @@ -467,7 +387,7 @@ OverlayImpl::onWrite (beast::PropertyStream::Map& stream) are known. */ void -OverlayImpl::onPeerActivated (Peer::ptr const& peer) +OverlayImpl::activate (Peer::ptr const& peer) { std::lock_guard lock (m_mutex); @@ -561,6 +481,56 @@ OverlayImpl::findPeerByShortID (Peer::ShortId const& id) //------------------------------------------------------------------------------ +void +OverlayImpl::autoconnect() +{ + auto const result = m_peerFinder->autoconnect(); + for (auto addr : result) + connect (addr); +} + +void +OverlayImpl::sendpeers() +{ + auto const result = m_peerFinder->sendpeers(); + for (auto const& e : result) + { + // VFALCO TODO Make sure this doesn't race with closing the peer + PeerImp::ptr peer; + { + std::lock_guard lock (m_mutex); + PeersBySlot::iterator const iter = m_peers.find (e.first); + if (iter != m_peers.end()) + peer = iter->second.lock(); + } + if (peer) + peer->send_endpoints (e.second.begin(), e.second.end()); + } +} + +void +OverlayImpl::do_timer (yield_context yield) +{ + for(;;) + { + m_peerFinder->once_per_second(); + sendpeers(); + autoconnect(); + + timer_.expires_from_now (std::chrono::seconds(1)); + error_code ec; + timer_.async_wait (yield[ec]); + if (ec == boost::asio::error::operation_aborted) + break; + } + + // Take off a reference + std::lock_guard lock (m_mutex); + release(); +} + +//------------------------------------------------------------------------------ + std::unique_ptr make_Overlay ( beast::Stoppable& parent, diff --git a/src/ripple/overlay/impl/OverlayImpl.h b/src/ripple/overlay/impl/OverlayImpl.h index ce9e3b0227..b60a898215 100644 --- a/src/ripple/overlay/impl/OverlayImpl.h +++ b/src/ripple/overlay/impl/OverlayImpl.h @@ -27,13 +27,14 @@ #include #include #include - #include #include - +#include +#include #include // #include #include +#include #include #include #include @@ -43,12 +44,13 @@ namespace ripple { class PeerDoor; class PeerImp; -class OverlayImpl - : public Overlay - , public PeerFinder::Callback +class OverlayImpl : public Overlay { private: - typedef boost::asio::ip::tcp::socket socket_type; + using clock_type = std::chrono::steady_clock; + using socket_type = boost::asio::ip::tcp::socket; + using error_code = boost::system::error_code; + using yield_context = boost::asio::yield_context; typedef hash_map > PeersBySlot; @@ -57,6 +59,7 @@ private: typedef hash_map PeerByShortId; + // VFALCO TODO Change to regular mutex and eliminate re-entrancy std::recursive_mutex m_mutex; // Blocks us until dependent objects have been destroyed @@ -72,6 +75,7 @@ private: boost::asio::io_service& m_io_service; boost::asio::ssl::context& m_ssl_context; + boost::asio::basic_waitable_timer timer_; /** Associates slots to peers. */ PeersBySlot m_peers; @@ -149,23 +153,6 @@ public: void remove (PeerFinder::Slot::ptr const& slot); - // - // PeerFinder::Callback - // - - void - connect (std::vector const& list); - - void - activate (PeerFinder::Slot::ptr const& slot); - - void - send (PeerFinder::Slot::ptr const& slot, - std::vector const& endpoints); - - void - disconnect (PeerFinder::Slot::ptr const& slot, bool graceful); - // // Stoppable // @@ -205,7 +192,7 @@ public: are known. */ void - onPeerActivated (Peer::ptr const& peer); + activate (Peer::ptr const& peer); /** A peer is being disconnected This is called during the disconnection of a known, activated peer. It @@ -216,6 +203,14 @@ public: onPeerDisconnect (Peer::ptr const& peer); private: + void + sendpeers(); + + void + autoconnect(); + + void + do_timer (yield_context yield); }; } // ripple diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 5a00cca4f9..3e37395b78 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -21,6 +21,7 @@ #include #include #include +#include namespace ripple { @@ -86,20 +87,15 @@ PeerImp::start () } void -PeerImp::activate () +PeerImp::close() { - assert (state_ == stateHandshaked); - state_ = stateActive; - assert(shortId_ == 0); - shortId_ = overlay_.next_id(); - overlay_.onPeerActivated(shared_from_this ()); -} + if (! strand_.running_in_this_thread()) + return strand_.post (std::bind ( + &PeerImp::close, shared_from_this())); -void -PeerImp::close (bool graceful) -{ - was_canceled_ = true; - detach ("stop", graceful); + error_code ec; + timer_.cancel (ec); + socket_->close(ec); } //------------------------------------------------------------------------------ @@ -309,10 +305,7 @@ void PeerImp::do_connect () usage_ = resourceManager_.newOutboundEndpoint (remote_address_); if (usage_.disconnect ()) - { - detach ("do_connect"); - return; - } + return detach ("do_connect"); boost::system::error_code ec; timer_.expires_from_now (nodeVerifySeconds, ec); @@ -321,8 +314,7 @@ void PeerImp::do_connect () if (ec) { journal_.error << "Failed to set verify timer."; - detach ("do_connect"); - return; + return detach ("do_connect"); } socket_->next_layer ().async_connect ( @@ -347,15 +339,15 @@ PeerImp::on_connect (error_code ec) journal_.error << "Connect to " << remote_address_ << " failed: " << ec.message(); - detach ("hc"); - return; + return detach ("hc"); } assert (state_ == stateConnecting); state_ = stateConnected; - peerFinder_.on_connected (slot_, - beast::IPAddressConversion::from_asio (local_endpoint)); + if (! peerFinder_.connected (slot_, + beast::IPAddressConversion::from_asio (local_endpoint))) + return detach("dup"); socket_->set_verify_mode (boost::asio::ssl::verify_none); socket_->async_handshake ( @@ -895,8 +887,6 @@ PeerImp::on_message (std::shared_ptr const& m) { error_code ec; - bool bDetach (true); - timer_.cancel (); std::uint32_t const ourTime (getApp().getOPs ().getNetworkTimeNC ()); @@ -915,6 +905,8 @@ PeerImp::on_message (std::shared_ptr const& m) auto protocol = BuildInfo::make_protocol(m->protoversion()); + // VFALCO TODO Report these failures in the HTTP response + if (m->has_nettime () && ((m->nettime () < minTime) || (m->nettime () > maxTime))) { @@ -974,46 +966,61 @@ PeerImp::on_message (std::shared_ptr const& m) "Connected to cluster node " << name_; assert (state_ == stateConnected); + // VFALCO TODO Remove this needless state state_ = stateHandshaked; - peerFinder_.on_handshake (slot_, RipplePublicKey (publicKey_), - clusterNode_); + auto const result = peerFinder_.activate (slot_, + RipplePublicKey (publicKey_), clusterNode_); - // XXX Set timer: connection is in grace period to be useful. - // XXX Set timer: connection idle (idle may vary depending on connection type.) - if ((hello_.has_ledgerclosed ()) && ( - hello_.ledgerclosed ().size () == (256 / 8))) + if (result == PeerFinder::Result::success) { - memcpy (closedLedgerHash_.begin (), - hello_.ledgerclosed ().data (), 256 / 8); + state_ = stateActive; + assert(shortId_ == 0); + shortId_ = overlay_.next_id(); + overlay_.activate(shared_from_this ()); - if ((hello_.has_ledgerprevious ()) && - (hello_.ledgerprevious ().size () == (256 / 8))) + // XXX Set timer: connection is in grace period to be useful. + // XXX Set timer: connection idle (idle may vary depending on connection type.) + if ((hello_.has_ledgerclosed ()) && ( + hello_.ledgerclosed ().size () == (256 / 8))) { - memcpy (previousLedgerHash_.begin (), - hello_.ledgerprevious ().data (), 256 / 8); - addLedger (previousLedgerHash_); - } - else - { - previousLedgerHash_.zero (); + memcpy (closedLedgerHash_.begin (), + hello_.ledgerclosed ().data (), 256 / 8); + + if ((hello_.has_ledgerprevious ()) && + (hello_.ledgerprevious ().size () == (256 / 8))) + { + memcpy (previousLedgerHash_.begin (), + hello_.ledgerprevious ().data (), 256 / 8); + addLedger (previousLedgerHash_); + } + else + { + previousLedgerHash_.zero(); + } } + + sendGetPeers(); + return ec; } - bDetach = false; + if (result == PeerFinder::Result::full) + { + // TODO Provide correct HTTP response + auto const redirects = peerFinder_.redirect (slot_); + send_endpoints (redirects.begin(), redirects.end()); + } + else + { + // TODO Duplicate connection + } } - if (bDetach) - { - //publicKey_.clear (); - //detach ("recvh"); - - ec = invalid_argument_error(); - } - else - { - sendGetPeers (); - } + // VFALCO Commented this out because we return an error code + // to the caller, who calls detach for us. + //publicKey_.clear (); + //detach ("recvh"); + ec = invalid_argument_error(); return ec; } @@ -2114,10 +2121,7 @@ PeerImp::detach (const char* rsn, bool graceful) // to have PeerFinder work reliably. detaching_ = true; // Race is ok. - if (was_canceled_) - peerFinder_.on_cancel (slot_); - else - peerFinder_.on_closed (slot_); + peerFinder_.on_closed (slot_); if (state_ == stateActive) overlay_.onPeerDisconnect (shared_from_this ()); diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index 5883d74425..a274f2be80 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -169,9 +169,6 @@ private: // The slot assigned to us by PeerFinder PeerFinder::Slot::ptr slot_; - // True if close was called - bool was_canceled_ = false; - boost::asio::streambuf read_buffer_; boost::optional http_message_; boost::optional http_parser_; @@ -211,16 +208,9 @@ public: void start (); - /** Indicates that the peer must be activated. - A peer is activated after the handshake is completed and if it is not - a second connection from a peer that we already have. Once activated - the peer transitions to `stateActive` and begins operating. - */ + // Cancel all I/O and close the socket void - activate (); - - /** Close the connection. */ - void close (bool graceful); + close(); void getLedger (protocol::TMGetLedger& packet); @@ -232,6 +222,13 @@ public: void send (Message::pointer const& m) override; + /** Send a set of PeerFinder endpoints as a protocol message. */ + template ::value_type, + PeerFinder::Endpoint>::value>> + void + send_endpoints (FwdIt first, FwdIt last); + beast::IP::Endpoint getRemoteAddress() const override; @@ -246,7 +243,7 @@ public: getShortId () const override; RippleAddress const& - getNodePublic () const; + getNodePublic () const override; Json::Value json() override; @@ -512,6 +509,31 @@ private: //------------------------------------------------------------------------------ +template +void +PeerImp::send_endpoints (FwdIt first, FwdIt last) +{ + protocol::TMEndpoints tm; + for (;first != last; ++first) + { + auto const& ep = *first; + protocol::TMEndpoint& tme (*tm.add_endpoints()); + if (ep.address.is_v4()) + tme.mutable_ipv4()->set_ipv4( + beast::toNetworkByteOrder (ep.address.to_v4().value)); + else + tme.mutable_ipv4()->set_ipv4(0); + tme.mutable_ipv4()->set_ipv4port (ep.address.port()); + tme.set_hops (ep.hops); + } + tm.set_version (1); + + send (std::make_shared (tm, protocol::mtENDPOINTS)); +} + +//------------------------------------------------------------------------------ + +// DEPRECATED const boost::posix_time::seconds PeerImp::nodeVerifySeconds (15); //------------------------------------------------------------------------------ diff --git a/src/ripple/peerfinder/Manager.h b/src/ripple/peerfinder/Manager.h index 5d72c5f5be..50d5cfb496 100644 --- a/src/ripple/peerfinder/Manager.h +++ b/src/ripple/peerfinder/Manager.h @@ -103,31 +103,6 @@ typedef std::vector Endpoints; //------------------------------------------------------------------------------ -/** The Callback receives PeerFinder notifications. - The notifications are sent on a thread owned by the PeerFinder, - so it is best not to do too much work in here. Just post functor - to another worker thread or job queue and return. -*/ -// DEPRECATED Callbacks only cause re-entrancy pain -struct Callback -{ - /** Initiate outgoing Peer connections to the specified set of endpoints. */ - virtual void connect (IPAddresses const& addresses) = 0; - - /** Activate the handshaked peer with the specified address. */ - virtual void activate (Slot::ptr const& slot) = 0; - - /** Sends a set of Endpoint records to the specified peer. */ - virtual void send (Slot::ptr const& slot, Endpoints const& endpoints) = 0; - - /** Disconnect the handshaked peer with the specified address. - @param graceful `true` to wait for send buffers to drain before closing. - */ - virtual void disconnect (Slot::ptr const& slot, bool graceful) = 0; -}; - -//------------------------------------------------------------------------------ - /** Possible results from activating a slot. */ enum class Result { @@ -146,20 +121,16 @@ protected: public: /** Create a new Manager. */ - static Manager* New ( - Stoppable& parent, - SiteFiles::Manager& siteFiles, + static Manager* New (Stoppable& parent, beast::File const& pathToDbFileOrDirectory, - Callback& callback, - clock_type& clock, - beast::Journal journal); + clock_type& clock, beast::Journal journal); /** Destroy the object. Any pending source fetch operations are aborted. There may be some listener calls made before the destructor returns. */ - virtual ~Manager () { } + virtual ~Manager() = default; /** Set the configuration for the manager. The new settings will be applied asynchronously. @@ -207,19 +178,6 @@ public: virtual Slot::ptr new_outbound_slot ( beast::IP::Endpoint const& remote_endpoint) = 0; - /** Called when an outbound connection attempt succeeds. - The local endpoint must be valid. If the caller receives an error - when retrieving the local endpoint from the socket, it should - proceed as if the connection attempt failed by calling on_closed - instead of on_connected. - */ - virtual void on_connected (Slot::ptr const& slot, - beast::IP::Endpoint const& local_endpoint) = 0; - - /** Called when a handshake is completed. */ - virtual void on_handshake (Slot::ptr const& slot, - RipplePublicKey const& key, bool cluster) = 0; - /** Called when mtENDPOINTS is received. */ virtual void on_endpoints (Slot::ptr const& slot, Endpoints const& endpoints) = 0; @@ -233,11 +191,46 @@ public: */ virtual void on_closed (Slot::ptr const& slot) = 0; - /** Called when the slot is closed via canceling operations. - This is instead of on_closed. - */ - virtual void on_cancel (Slot::ptr const& slot) = 0; + //-------------------------------------------------------------------------- + /** Called when an outbound connection attempt succeeds. + The local endpoint must be valid. If the caller receives an error + when retrieving the local endpoint from the socket, it should + proceed as if the connection attempt failed by calling on_closed + instead of on_connected. + @return `true` if the connection should be kept + */ + virtual + bool + connected (Slot::ptr const& slot, + beast::IP::Endpoint const& local_endpoint) = 0; + + /** Request an active slot type. */ + virtual + Result + activate (Slot::ptr const& slot, + RipplePublicKey const& key, bool cluster) = 0; + + /** Returns a set of endpoints suitable for redirection. */ + virtual + std::vector + redirect (Slot::ptr const& slot) = 0; + + /** Return a set of addresses we should connect to. */ + virtual + std::vector + autoconnect() = 0; + + virtual + std::vector>> + sendpeers() = 0; + + /** Perform periodic activity. + This should be called once per second. + */ + virtual + void + once_per_second() = 0; }; } diff --git a/src/ripple/peerfinder/impl/Checker.cpp b/src/ripple/peerfinder/impl/Checker.cpp deleted file mode 100644 index 3f0e7ed73a..0000000000 --- a/src/ripple/peerfinder/impl/Checker.cpp +++ /dev/null @@ -1,181 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2012, 2013 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace ripple { -namespace PeerFinder { - -class CheckerImp - : public Checker - , private beast::Thread - , private beast::LeakChecked -{ -private: - class Request; - - struct State - { - beast::List list; - }; - - typedef beast::SharedData SharedState; - - SharedState m_state; - boost::asio::io_service m_io_service; - boost::optional m_work; - - //-------------------------------------------------------------------------- - - class Request - : public beast::SharedObject - , public beast::List ::Node - , private beast::LeakChecked - { - public: - typedef beast::SharedPtr Ptr; - typedef boost::asio::ip::tcp Protocol; - typedef boost::system::error_code error_code; - typedef Protocol::socket socket_type; - typedef Protocol::endpoint endpoint_type; - - CheckerImp& m_owner; - boost::asio::io_service& m_io_service; - beast::IP::Endpoint m_address; - beast::asio::shared_handler m_handler; - socket_type m_socket; - boost::system::error_code m_error; - bool m_canAccept; - - Request (CheckerImp& owner, boost::asio::io_service& io_service, - beast::IP::Endpoint const& address, beast::asio::shared_handler < - void (Result)> const& handler) - : m_owner (owner) - , m_io_service (io_service) - , m_address (address) - , m_handler (handler) - , m_socket (m_io_service) - , m_canAccept (false) - { - m_owner.add (*this); - - m_socket.async_connect (beast::IPAddressConversion::to_asio_endpoint ( - m_address), beast::asio::wrap_handler (std::bind ( - &Request::handle_connect, Ptr(this), - beast::asio::placeholders::error), m_handler)); - } - - ~Request () - { - Result result; - result.address = m_address; - result.error = m_error; - m_io_service.wrap (m_handler) (result); - - m_owner.remove (*this); - } - - void cancel () - { - m_socket.cancel(); - } - - void handle_connect (boost::system::error_code ec) - { - m_error = ec; - if (ec) - return; - - m_canAccept = true; - } - }; - - //-------------------------------------------------------------------------- - - void add (Request& request) - { - SharedState::Access state (m_state); - state->list.push_back (request); - } - - void remove (Request& request) - { - SharedState::Access state (m_state); - state->list.erase (state->list.iterator_to (request)); - } - - void run () - { - m_io_service.run (); - } - -public: - CheckerImp () - : Thread ("PeerFinder::Checker") - , m_work (boost::in_place (std::ref (m_io_service))) - { - startThread (); - } - - ~CheckerImp () - { - // cancel pending i/o - cancel(); - - // destroy the io_service::work object - m_work = boost::none; - - // signal and wait for the thread to exit gracefully - stopThread (); - } - - void cancel () - { - SharedState::Access state (m_state); - for (beast::List ::iterator iter (state->list.begin()); - iter != state->list.end(); ++iter) - iter->cancel(); - } - - void async_test (beast::IP::Endpoint const& endpoint, - beast::asio::shared_handler handler) - { - new Request (*this, m_io_service, endpoint, handler); - } -}; - -//------------------------------------------------------------------------------ - -Checker* Checker::New () -{ - return new CheckerImp; -} - -} -} diff --git a/src/ripple/peerfinder/impl/Checker.h b/src/ripple/peerfinder/impl/Checker.h index 707324a4e1..dae5898bdb 100644 --- a/src/ripple/peerfinder/impl/Checker.h +++ b/src/ripple/peerfinder/impl/Checker.h @@ -20,20 +20,79 @@ #ifndef RIPPLE_PEERFINDER_CHECKER_H_INCLUDED #define RIPPLE_PEERFINDER_CHECKER_H_INCLUDED -#include +#include +#include +#include +#include +#include +#include #include +#include +#include +#include +#include namespace ripple { namespace PeerFinder { /** Tests remote listening sockets to make sure they are connectible. */ + template class Checker { +private: + using error_code = boost::system::error_code; + + struct basic_async_op : boost::intrusive::list_base_hook < + boost::intrusive::link_mode > + { + virtual + ~basic_async_op() = default; + + virtual + void + stop() = 0; + + virtual + void + operator() (error_code const& ec) = 0; + }; + + template + struct async_op : basic_async_op + { + using socket_type = typename Protocol::socket; + using endpoint_type = typename Protocol::endpoint; + + Checker& checker_; + socket_type socket_; + Handler handler_; + + async_op (Checker& owner, boost::asio::io_service& io_service, + Handler&& handler); + + ~async_op(); + + void + stop() override; + + void + operator() (error_code const& ec) override; + }; + + //-------------------------------------------------------------------------- + + using list_type = typename boost::intrusive::make_list >::type; + + std::mutex mutex_; + std::condition_variable cond_; + boost::asio::io_service& io_service_; + list_type list_; + bool stop_ = false; + public: - /** Create the service. - This will automatically start the associated thread and io_service. - */ - static Checker* New (); + explicit + Checker (boost::asio::io_service& io_service); /** Destroy the service. Any pending I/O operations will be canceled. This call blocks until @@ -41,40 +100,134 @@ public: operation_aborted) and the associated thread and io_service have no more work remaining. */ - virtual ~Checker () { } + ~Checker(); - /** Cancel pending I/O. + /** Stop the service. + Pending I/O operations will be canceled. This issues cancel orders for all pending I/O operations and then returns immediately. Handlers will receive operation_aborted errors, or if they were already queued they will complete normally. */ - virtual void cancel () = 0; + void + stop(); - struct Result - { - Result () - : canAccept (false) - { } - - /** The original address. */ - beast::IP::Endpoint address; - - /** The error code from the operation. */ - boost::system::error_code error; - - /** `true` if the endpoint is reachable, else `false`. - Only defined if no error occurred. - */ - bool canAccept; - }; + /** Block until all pending I/O completes. */ + void + wait(); /** Performs an async connection test on the specified endpoint. - The port must be non-zero. + The port must be non-zero. Note that the execution guarantees + offered by asio handlers are NOT enforced. */ - virtual void async_test (beast::IP::Endpoint const& endpoint, - beast::asio::shared_handler handler) = 0; + template + void + async_connect (beast::IP::Endpoint const& endpoint, Handler&& handler); + +private: + void + remove (basic_async_op& op); }; +//------------------------------------------------------------------------------ + +template +template +Checker::async_op::async_op (Checker& owner, + boost::asio::io_service& io_service, Handler&& handler) + : checker_ (owner) + , socket_ (io_service) + , handler_ (std::forward(handler)) +{ +} + +template +template +Checker::async_op::~async_op() +{ + checker_.remove (*this); +} + +template +template +void +Checker::async_op::stop() +{ + error_code ec; + socket_.cancel(ec); +} + +template +template +void +Checker::async_op::operator() ( + error_code const& ec) +{ + handler_(ec); +} + +//------------------------------------------------------------------------------ + +template +Checker::Checker (boost::asio::io_service& io_service) + : io_service_(io_service) +{ +} + +template +Checker::~Checker() +{ + wait(); +} + +template +void +Checker::stop() +{ + std::lock_guard lock (mutex_); + if (! stop_) + { + stop_ = true; + for (auto& c : list_) + c.stop(); + } +} + +template +void +Checker::wait() +{ + std::unique_lock lock (mutex_); + while (! list_.empty()) + cond_.wait (lock); +} + +template +template +void +Checker::async_connect ( + beast::IP::Endpoint const& endpoint, Handler&& handler) +{ + auto const op = std::make_shared> ( + *this, io_service_, std::forward(handler)); + { + std::lock_guard lock (mutex_); + list_.push_back (*op); + } + op->socket_.async_connect (beast::IPAddressConversion::to_asio_endpoint ( + endpoint), std::bind (&basic_async_op::operator(), op, + beast::asio::placeholders::error)); +} + +template +void +Checker::remove (basic_async_op& op) +{ + std::lock_guard lock (mutex_); + list_.erase (list_.iterator_to (op)); + if (list_.size() == 0) + cond_.notify_all(); +} + } } diff --git a/src/ripple/peerfinder/impl/CheckerAdapter.h b/src/ripple/peerfinder/impl/CheckerAdapter.h deleted file mode 100644 index f056b7e1d3..0000000000 --- a/src/ripple/peerfinder/impl/CheckerAdapter.h +++ /dev/null @@ -1,105 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2012, 2013 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#ifndef RIPPLE_PEERFINDER_CHECKERADAPTER_H_INCLUDED -#define RIPPLE_PEERFINDER_CHECKERADAPTER_H_INCLUDED - -#include -#include -#include -#include - -namespace ripple { -namespace PeerFinder { - -// Ensures that all Logic member function entry points are -// called while holding a lock on the recursive mutex. -// -typedef beast::ScopedWrapperContext < - beast::RecursiveMutex, beast::RecursiveMutex::ScopedLockType> SerializedContext; - -/** Adapts a ServiceQueue to dispatch Checker handler completions. - This lets the Logic have its Checker handler get dispatched - on the ServiceQueue instead of an io_service thread. Otherwise, - Logic would need a ServiceQueue to dispatch from its handler. -*/ -class CheckerAdapter : public Checker -{ -private: - SerializedContext& m_context; - beast::ServiceQueue& m_queue; - std::unique_ptr m_checker; - - struct Handler - { - SerializedContext& m_context; - beast::ServiceQueue& m_queue; - beast::asio::shared_handler m_handler; - - Handler ( - SerializedContext& context, - beast::ServiceQueue& queue, - beast::asio::shared_handler const& handler) - : m_context (context) - , m_queue (queue) - , m_handler (handler) - { } - - void operator() (Checker::Result result) - { - // VFALCO TODO Fix this, it is surely wrong but - // this supposedly correct line doesn't compile - //m_queue.wrap (m_context.wrap (m_handler)) (result); - - // WRONG - m_queue.wrap (m_handler) (result); - } - }; - -public: - CheckerAdapter (SerializedContext& context, beast::ServiceQueue& queue) - : m_context (context) - , m_queue (queue) - , m_checker (Checker::New()) - { - } - - ~CheckerAdapter () - { - // Have to do this before other fields get destroyed - m_checker = nullptr; - } - - void cancel () - { - m_checker->cancel(); - } - - void async_test (beast::IP::Endpoint const& endpoint, - beast::asio::shared_handler handler) - { - m_checker->async_test (endpoint, Handler ( - m_context, m_queue, handler)); - } -}; - -} -} - -#endif diff --git a/src/ripple/peerfinder/impl/Handouts.h b/src/ripple/peerfinder/impl/Handouts.h index 132d3ff5d6..5747d5a76f 100644 --- a/src/ripple/peerfinder/impl/Handouts.h +++ b/src/ripple/peerfinder/impl/Handouts.h @@ -116,6 +116,11 @@ public: return slot_; } + std::vector & list() + { + return list_; + } + std::vector const& list() const { return list_; diff --git a/src/ripple/peerfinder/impl/Logic.h b/src/ripple/peerfinder/impl/Logic.h index 57cde0e07b..a9b35f4e3e 100644 --- a/src/ripple/peerfinder/impl/Logic.h +++ b/src/ripple/peerfinder/impl/Logic.h @@ -28,9 +28,8 @@ #include #include #include - #include - +#include #include #include @@ -41,6 +40,7 @@ namespace PeerFinder { We keep this in a separate class so it can be instantiated for unit tests. */ +template class Logic { public: @@ -113,7 +113,6 @@ public: beast::Journal m_journal; SharedState m_state; clock_type& m_clock; - Callback& m_callback; Store& m_store; Checker& m_checker; @@ -126,29 +125,24 @@ public: //-------------------------------------------------------------------------- - Logic ( - clock_type& clock, - Callback& callback, - Store& store, - Checker& checker, - beast::Journal journal) + Logic (clock_type& clock, Store& store, + Checker& checker, beast::Journal journal) : m_journal (journal, Reporting::logic) , m_state (&store, std::ref (clock), journal) , m_clock (clock) - , m_callback (callback) , m_store (store) , m_checker (checker) , m_whenBroadcast (m_clock.now()) , m_squelches (m_clock) { - setConfig (Config ()); + config ({}); } // Load persistent state information from the Store // void load () { - SharedState::Access state (m_state); + typename SharedState::Access state (m_state); state->bootcache.load (); } @@ -161,7 +155,7 @@ public: */ void stop () { - SharedState::Access state (m_state); + typename SharedState::Access state (m_state); state->stopping = true; if (state->fetchSource != nullptr) state->fetchSource->cancel (); @@ -173,17 +167,19 @@ public: // //-------------------------------------------------------------------------- - void setConfig (Config const& config) + void + config (Config const& c) { - SharedState::Access state (m_state); - state->config = config; + typename SharedState::Access state (m_state); + state->config = c; state->counts.onConfig (state->config); } - void addFixedPeer (std::string const& name, + void + addFixedPeer (std::string const& name, std::vector const& addresses) { - SharedState::Access state (m_state); + typename SharedState::Access state (m_state); if (addresses.empty ()) { @@ -213,12 +209,12 @@ public: // Called when the Checker completes a connectivity test void checkComplete (beast::IP::Endpoint const& remoteAddress, beast::IP::Endpoint const& checkedAddress, - Checker::Result const& result) + boost::system::error_code ec) { - if (result.error == boost::asio::error::operation_aborted) + if (ec == boost::asio::error::operation_aborted) return; - SharedState::Access state (m_state); + typename SharedState::Access state (m_state); Slots::iterator const iter (state->slots.find (remoteAddress)); SlotImp& slot (*iter->second); @@ -231,38 +227,23 @@ public: return; } - // Mark that a check for this slot is finished. + slot.checked = true; slot.connectivityCheckInProgress = false; - if (! result.error) - { - slot.checked = true; - slot.canAccept = result.canAccept; - - if (slot.canAccept) - { - if (m_journal.debug) m_journal.debug << beast::leftw (18) << - "Logic testing " << checkedAddress << " succeeded"; - } - else - { - if (m_journal.info) m_journal.info << beast::leftw (18) << - "Logic testing " << checkedAddress << " failed"; - } - } - else + if (ec) { // VFALCO TODO Should we retry depending on the error? - slot.checked = true; slot.canAccept = false; - if (m_journal.error) m_journal.error << beast::leftw (18) << "Logic testing " << iter->first << " with error, " << - result.error.message(); + ec.message(); + state->bootcache.on_failure (checkedAddress); + return; } - if (! slot.canAccept) - state->bootcache.on_failure (checkedAddress); + slot.canAccept = true; + if (m_journal.debug) m_journal.debug << beast::leftw (18) << + "Logic testing " << checkedAddress << " succeeded"; } //-------------------------------------------------------------------------- @@ -274,7 +255,7 @@ public: "Logic accept" << remote_endpoint << " on local " << local_endpoint; - SharedState::Access state (m_state); + typename SharedState::Access state (m_state); // Check for duplicate connection { @@ -346,7 +327,7 @@ public: if (m_journal.debug) m_journal.debug << beast::leftw (18) << "Logic connect " << remote_endpoint; - SharedState::Access state (m_state); + typename SharedState::Access state (m_state); // Check for duplicate connection if (state->slots.find (remote_endpoint) != @@ -378,14 +359,15 @@ public: return result.first->second; } - void on_connected (SlotImp::ptr const& slot, + bool + connected (SlotImp::ptr const& slot, beast::IP::Endpoint const& local_endpoint) { if (m_journal.trace) m_journal.trace << beast::leftw (18) << "Logic connected" << slot->remote_endpoint () << " on local " << local_endpoint; - SharedState::Access state (m_state); + typename SharedState::Access state (m_state); // The object must exist in our table assert (state->slots.find (slot->remote_endpoint ()) != @@ -403,8 +385,7 @@ public: if (m_journal.warning) m_journal.warning << beast::leftw (18) << "Logic dropping " << slot->remote_endpoint () << " as self connect"; - m_callback.disconnect (slot, false); - return; + return false; } } @@ -412,16 +393,18 @@ public: state->counts.remove (*slot); slot->state (Slot::connected); state->counts.add (*slot); + return true; } - void on_handshake (SlotImp::ptr const& slot, + Result + activate (SlotImp::ptr const& slot, RipplePublicKey const& key, bool cluster) { if (m_journal.debug) m_journal.debug << beast::leftw (18) << "Logic handshake " << slot->remote_endpoint () << " with " << (cluster ? "clustered " : "") << "key " << key; - SharedState::Access state (m_state); + typename SharedState::Access state (m_state); // The object must exist in our table assert (state->slots.find (slot->remote_endpoint ()) != @@ -432,68 +415,282 @@ public: // Check for duplicate connection by key if (state->keys.find (key) != state->keys.end()) - { - m_callback.disconnect (slot, true); - return; - } + return Result::duplicate; // See if we have an open space for this slot - if (state->counts.can_activate (*slot)) + if (! state->counts.can_activate (*slot)) { - // Set key and cluster right before adding to the map - // otherwise we could assert later when erasing the key. - state->counts.remove (*slot); - slot->public_key (key); - slot->cluster (cluster); - state->counts.add (*slot); - - // Add the public key to the active set - std::pair const result ( - state->keys.insert (key)); - // Public key must not already exist - assert (result.second); - (void) result.second; - - // Change state and update counts - state->counts.remove (*slot); - slot->activate (m_clock.now ()); - state->counts.add (*slot); - if (! slot->inbound()) state->bootcache.on_success ( slot->remote_endpoint()); + return Result::full; + } - // Mark fixed slot success - if (slot->fixed() && ! slot->inbound()) + // Set key and cluster right before adding to the map + // otherwise we could assert later when erasing the key. + state->counts.remove (*slot); + slot->public_key (key); + slot->cluster (cluster); + state->counts.add (*slot); + + // Add the public key to the active set + std::pair const result ( + state->keys.insert (key)); + // Public key must not already exist + assert (result.second); + + // Change state and update counts + state->counts.remove (*slot); + slot->activate (m_clock.now ()); + state->counts.add (*slot); + + if (! slot->inbound()) + state->bootcache.on_success ( + slot->remote_endpoint()); + + // Mark fixed slot success + if (slot->fixed() && ! slot->inbound()) + { + auto iter (state->fixed.find (slot->remote_endpoint())); + assert (iter != state->fixed.end ()); + iter->second.success (m_clock.now ()); + if (m_journal.trace) m_journal.trace << beast::leftw (18) << + "Logic fixed " << slot->remote_endpoint () << " success"; + } + + return Result::success; + } + + /** Return a list of addresses suitable for redirection. + This is a legacy function, redirects should be returned in + the HTTP handshake and not via TMEndpoints. + */ + std::vector + redirect (SlotImp::ptr const& slot) + { + typename SharedState::Access state (m_state); + RedirectHandouts h (slot); + state->livecache.hops.shuffle(); + handout (&h, (&h)+1, + state->livecache.hops.begin(), + state->livecache.hops.end()); + return std::move(h.list()); + } + + /** Create new outbound connection attempts as needed. + This implements PeerFinder's "Outbound Connection Strategy" + */ + std::vector + autoconnect() + { + std::vector const none; + + typename SharedState::Access state (m_state); + + // Count how many more outbound attempts to make + // + auto needed (state->counts.attempts_needed ()); + if (needed == 0) + return none; + + ConnectHandouts h (needed, m_squelches); + + // Make sure we don't connect to already-connected entries. + squelch_slots (state); + + // 1. Use Fixed if: + // Fixed active count is below fixed count AND + // ( There are eligible fixed addresses to try OR + // Any outbound attempts are in progress) + // + if (state->counts.fixed_active() < state->fixed.size ()) + { + get_fixed (needed, h.list(), state); + + if (! h.list().empty ()) { - auto iter (state->fixed.find (slot->remote_endpoint())); - assert (iter != state->fixed.end ()); - iter->second.success (m_clock.now ()); - if (m_journal.trace) m_journal.trace << beast::leftw (18) << - "Logic fixed " << slot->remote_endpoint () << " success"; + if (m_journal.debug) m_journal.debug << beast::leftw (18) << + "Logic connect " << h.list().size() << " fixed"; + return h.list(); } - m_callback.activate (slot); + if (state->counts.attempts() > 0) + { + if (m_journal.debug) m_journal.debug << beast::leftw (18) << + "Logic waiting on " << + state->counts.attempts() << " attempts"; + return none; + } } - else + + // Only proceed if auto connect is enabled and we + // have less than the desired number of outbound slots + // + if (! state->config.autoConnect || + state->counts.out_active () >= state->counts.out_max ()) + return none; + + // 2. Use Livecache if: + // There are any entries in the cache OR + // Any outbound attempts are in progress + // { - if (! slot->inbound()) - state->bootcache.on_success ( - slot->remote_endpoint()); - - // Maybe give them some addresses to try - if (slot->inbound ()) - redirect (slot, state); - - m_callback.disconnect (slot, true); + state->livecache.hops.shuffle(); + handout (&h, (&h)+1, + state->livecache.hops.rbegin(), + state->livecache.hops.rend()); + if (! h.list().empty ()) + { + if (m_journal.debug) m_journal.debug << beast::leftw (18) << + "Logic connect " << h.list().size () << " live " << + ((h.list().size () > 1) ? "endpoints" : "endpoint"); + return h.list(); + } + else if (state->counts.attempts() > 0) + { + if (m_journal.debug) m_journal.debug << beast::leftw (18) << + "Logic waiting on " << + state->counts.attempts() << " attempts"; + return none; + } } + + /* 3. Bootcache refill + If the Bootcache is empty, try to get addresses from the current + set of Sources and add them into the Bootstrap cache. + + Pseudocode: + If ( domainNames.count() > 0 AND ( + unusedBootstrapIPs.count() == 0 + OR activeNameResolutions.count() > 0) ) + ForOneOrMore (DomainName that hasn't been resolved recently) + Contact DomainName and add entries to the unusedBootstrapIPs + return; + */ + + // 4. Use Bootcache if: + // There are any entries we haven't tried lately + // + for (auto iter (state->bootcache.begin()); + ! h.full() && iter != state->bootcache.end(); ++iter) + h.try_insert (*iter); + + if (! h.list().empty ()) + { + if (m_journal.debug) m_journal.debug << beast::leftw (18) << + "Logic connect " << h.list().size () << " boot " << + ((h.list().size () > 1) ? "addresses" : "address"); + return h.list(); + } + + // If we get here we are stuck + return none; + } + + std::vector>> + sendpeers() + { + std::vector>> result; + + typename SharedState::Access state (m_state); + + clock_type::time_point const now = m_clock.now(); + if (m_whenBroadcast <= now) + { + std::vector targets; + + { + // build list of active slots + std::vector slots; + slots.reserve (state->slots.size()); + std::for_each (state->slots.cbegin(), state->slots.cend(), + [&slots](Slots::value_type const& value) + { + if (value.second->state() == Slot::active) + slots.emplace_back (value.second); + }); + std::random_shuffle (slots.begin(), slots.end()); + + // build target vector + targets.reserve (slots.size()); + std::for_each (slots.cbegin(), slots.cend(), + [&targets](SlotImp::ptr const& slot) + { + targets.emplace_back (slot); + }); + } + + /* VFALCO NOTE + This is a temporary measure. Once we know our own IP + address, the correct solution is to put it into the Livecache + at hops 0, and go through the regular handout path. This way + we avoid handing our address out too frequenty, which this code + suffers from. + */ + // Add an entry for ourselves if: + // 1. We want incoming + // 2. We have slots + // 3. We haven't failed the firewalled test + // + if (state->config.wantIncoming && + state->counts.inboundSlots() > 0) + { + Endpoint ep; + ep.hops = 0; + ep.address = beast::IP::Endpoint ( + beast::IP::AddressV4 ()).at_port ( + state->config.listeningPort); + for (auto& t : targets) + t.insert (ep); + } + + // build sequence of endpoints by hops + state->livecache.hops.shuffle(); + handout (targets.begin(), targets.end(), + state->livecache.hops.begin(), + state->livecache.hops.end()); + + // broadcast + for (auto const& t : targets) + { + SlotImp::ptr const& slot = t.slot(); + auto const& list = t.list(); + if (m_journal.trace) m_journal.trace << beast::leftw (18) << + "Logic sending " << slot->remote_endpoint() << + " with " << list.size() << + ((list.size() == 1) ? " endpoint" : " endpoints"); + result.push_back (std::make_pair (slot, list)); + } + + m_whenBroadcast = now + Tuning::secondsPerMessage; + } + + return result; + } + + void once_per_second() + { + typename SharedState::Access state (m_state); + + // Expire the Livecache + state->livecache.expire (); + + // Expire the recent cache in each slot + for (auto const& entry : state->slots) + entry.second->expire(); + + // Expire the recent attempts table + beast::expire (m_squelches, + Tuning::recentAttemptDuration); + + state->bootcache.periodicActivity (); } //-------------------------------------------------------------------------- // Validate and clean up the list that we received from the slot. void preprocess (SlotImp::ptr const& slot, Endpoints& list, - SharedState::Access& state) + typename SharedState::Access& state) { bool neighbor (false); for (auto iter (list.begin()); iter != list.end();) @@ -570,7 +767,7 @@ public: " contained " << list.size () << ((list.size() > 1) ? " entries" : " entry"); - SharedState::Access state (m_state); + typename SharedState::Access state (m_state); // The object must exist in our table assert (state->slots.find (slot->remote_endpoint ()) != @@ -583,10 +780,8 @@ public: clock_type::time_point const now (m_clock.now()); - for (auto iter (list.cbegin()); iter != list.cend(); ++iter) + for (auto const& ep : list) { - Endpoint const& ep (*iter); - assert (ep.hops != 0); slot->recent.insert (ep.address, ep.hops); @@ -611,8 +806,8 @@ public: // Test the slot's listening port before // adding it to the livecache for the first time. // - m_checker.async_test (ep.address, std::bind ( - &Logic::checkComplete, this, slot->remote_endpoint (), + m_checker.async_connect (ep.address, std::bind ( + &Logic::checkComplete, this, slot->remote_endpoint(), ep.address, std::placeholders::_1)); // Note that we simply discard the first Endpoint @@ -644,13 +839,13 @@ public: void on_legacy_endpoints (IPAddresses const& list) { // Ignoring them also seems a valid choice. - SharedState::Access state (m_state); + typename SharedState::Access state (m_state); for (IPAddresses::const_iterator iter (list.begin()); iter != list.end(); ++iter) state->bootcache.insert (*iter); } - void remove (SlotImp::ptr const& slot, SharedState::Access& state) + void remove (SlotImp::ptr const& slot, typename SharedState::Access& state) { Slots::iterator const iter (state->slots.find ( slot->remote_endpoint ())); @@ -681,7 +876,7 @@ public: void on_closed (SlotImp::ptr const& slot) { - SharedState::Access state (m_state); + typename SharedState::Access state (m_state); remove (slot, state); @@ -728,20 +923,10 @@ public: } } - void on_cancel (SlotImp::ptr const& slot) - { - SharedState::Access state (m_state); - - remove (slot, state); - - if (m_journal.trace) m_journal.trace << beast::leftw (18) << - "Logic cancel " << slot->remote_endpoint(); - } - //-------------------------------------------------------------------------- // Returns `true` if the address matches a fixed slot address - bool fixed (beast::IP::Endpoint const& endpoint, SharedState::Access& state) const + bool fixed (beast::IP::Endpoint const& endpoint, typename SharedState::Access& state) const { for (auto const& entry : state->fixed) if (entry.first == endpoint) @@ -751,7 +936,7 @@ public: // Returns `true` if the address matches a fixed slot address // Note that this does not use the port information in the IP::Endpoint - bool fixed (beast::IP::Address const& address, SharedState::Access& state) const + bool fixed (beast::IP::Address const& address, typename SharedState::Access& state) const { for (auto const& entry : state->fixed) if (entry.first.address () == address) @@ -768,7 +953,7 @@ public: /** Adds eligible Fixed addresses for outbound attempts. */ template void get_fixed (std::size_t needed, Container& c, - SharedState::Access& state) + typename SharedState::Access& state) { auto const now (m_clock.now()); for (auto iter = state->fixed.begin (); @@ -791,7 +976,7 @@ public: //-------------------------------------------------------------------------- // Adds slot addresses to the squelched set - void squelch_slots (SharedState::Access& state) + void squelch_slots (typename SharedState::Access& state) { for (auto const& s : state->slots) { @@ -802,164 +987,20 @@ public: } } - /** Create new outbound connection attempts as needed. - This implements PeerFinder's "Outbound Connection Strategy" - */ - void autoconnect () - { - SharedState::Access state (m_state); - - // Count how many more outbound attempts to make - // - auto needed (state->counts.attempts_needed ()); - if (needed == 0) - return; - - ConnectHandouts h (needed, m_squelches); - - // Make sure we don't connect to already-connected entries. - squelch_slots (state); - - // 1. Use Fixed if: - // Fixed active count is below fixed count AND - // ( There are eligible fixed addresses to try OR - // Any outbound attempts are in progress) - // - if (state->counts.fixed_active() < state->fixed.size ()) - { - get_fixed (needed, h.list(), state); - - if (! h.list().empty ()) - { - if (m_journal.debug) m_journal.debug << beast::leftw (18) << - "Logic connect " << h.list().size() << " fixed"; - m_callback.connect (h.list()); - return; - } - - if (state->counts.attempts() > 0) - { - if (m_journal.debug) m_journal.debug << beast::leftw (18) << - "Logic waiting on " << - state->counts.attempts() << " attempts"; - return; - } - } - - // Only proceed if auto connect is enabled and we - // have less than the desired number of outbound slots - // - if (! state->config.autoConnect || - state->counts.out_active () >= state->counts.out_max ()) - return; - - // 2. Use Livecache if: - // There are any entries in the cache OR - // Any outbound attempts are in progress - // - { - state->livecache.hops.shuffle(); - handout (&h, (&h)+1, - state->livecache.hops.rbegin(), - state->livecache.hops.rend()); - if (! h.list().empty ()) - { - if (m_journal.debug) m_journal.debug << beast::leftw (18) << - "Logic connect " << h.list().size () << " live " << - ((h.list().size () > 1) ? "endpoints" : "endpoint"); - m_callback.connect (h.list()); - return; - } - else if (state->counts.attempts() > 0) - { - if (m_journal.debug) m_journal.debug << beast::leftw (18) << - "Logic waiting on " << - state->counts.attempts() << " attempts"; - return; - } - } - - /* 3. Bootcache refill - If the Bootcache is empty, try to get addresses from the current - set of Sources and add them into the Bootstrap cache. - - Pseudocode: - If ( domainNames.count() > 0 AND ( - unusedBootstrapIPs.count() == 0 - OR activeNameResolutions.count() > 0) ) - ForOneOrMore (DomainName that hasn't been resolved recently) - Contact DomainName and add entries to the unusedBootstrapIPs - return; - */ - - // 4. Use Bootcache if: - // There are any entries we haven't tried lately - // - for (auto iter (state->bootcache.begin()); - ! h.full() && iter != state->bootcache.end(); ++iter) - h.try_insert (*iter); - - if (! h.list().empty ()) - { - if (m_journal.debug) m_journal.debug << beast::leftw (18) << - "Logic connect " << h.list().size () << " boot " << - ((h.list().size () > 1) ? "addresses" : "address"); - m_callback.connect (h.list()); - return; - } - - // If we get here we are stuck - } - //-------------------------------------------------------------------------- - void addStaticSource (beast::SharedPtr const& source) + void + addStaticSource (beast::SharedPtr const& source) { fetch (source); } - void addSource (beast::SharedPtr const& source) + void + addSource (beast::SharedPtr const& source) { m_sources.push_back (source); } - //-------------------------------------------------------------------------- - - // Called periodically to sweep the livecache and remove aged out items. - void expire () - { - SharedState::Access state (m_state); - - // Expire the Livecache - state->livecache.expire (); - - // Expire the recent cache in each slot - for (auto const& entry : state->slots) - entry.second->expire(); - - // Expire the recent attempts table - beast::expire (m_squelches, - Tuning::recentAttemptDuration); - } - - // Called every so often to perform periodic tasks. - void periodicActivity () - { - SharedState::Access state (m_state); - - clock_type::time_point const now (m_clock.now()); - - autoconnect (); - expire (); - state->bootcache.periodicActivity (); - - if (m_whenBroadcast <= now) - { - broadcast (); - m_whenBroadcast = now + Tuning::secondsPerMessage; - } - } - //-------------------------------------------------------------------------- // // Bootcache livecache sources @@ -970,7 +1011,7 @@ public: // Returns `true` if the address is new. // bool addBootcacheAddress (beast::IP::Endpoint const& address, - SharedState::Access& state) + typename SharedState::Access& state) { return state->bootcache.insert (address); } @@ -981,7 +1022,7 @@ public: int addBootcacheAddresses (IPAddresses const& list) { int count (0); - SharedState::Access state (m_state); + typename SharedState::Access state (m_state); for (auto addr : list) { if (addBootcacheAddress (addr, state)) @@ -997,7 +1038,7 @@ public: { { - SharedState::Access state (m_state); + typename SharedState::Access state (m_state); if (state->stopping) return; state->fetchSource = source; @@ -1009,7 +1050,7 @@ public: source->fetch (results, m_journal); { - SharedState::Access state (m_state); + typename SharedState::Access state (m_state); if (state->stopping) return; state->fetchSource = nullptr; @@ -1050,103 +1091,6 @@ public: return true; } - //-------------------------------------------------------------------------- - - // Gives a slot a set of addresses to try next since we're full - void redirect (SlotImp::ptr const& slot, SharedState::Access& state) - { - RedirectHandouts h (slot); - state->livecache.hops.shuffle(); - handout (&h, (&h)+1, - state->livecache.hops.begin(), - state->livecache.hops.end()); - - if (! h.list().empty ()) - { - if (m_journal.trace) m_journal.trace << beast::leftw (18) << - "Logic redirect " << slot->remote_endpoint() << - " with " << h.list().size() << - ((h.list().size() == 1) ? " address" : " addresses"); - m_callback.send (slot, h.list()); - } - else - { - if (m_journal.warning) m_journal.warning << beast::leftw (18) << - "Logic deferred " << slot->remote_endpoint(); - } - } - - // Send mtENDPOINTS for each slot as needed - void broadcast () - { - SharedState::Access state (m_state); - - std::vector targets; - - { - // build list of active slots - std::vector slots; - slots.reserve (state->slots.size()); - std::for_each (state->slots.cbegin(), state->slots.cend(), - [&slots](Slots::value_type const& value) - { - if (value.second->state() == Slot::active) - slots.emplace_back (value.second); - }); - std::random_shuffle (slots.begin(), slots.end()); - - // build target vector - targets.reserve (slots.size()); - std::for_each (slots.cbegin(), slots.cend(), - [&targets](SlotImp::ptr const& slot) - { - targets.emplace_back (slot); - }); - } - - /* VFALCO NOTE - This is a temporary measure. Once we know our own IP - address, the correct solution is to put it into the Livecache - at hops 0, and go through the regular handout path. This way - we avoid handing our address out too frequenty, which this code - suffers from. - */ - // Add an entry for ourselves if: - // 1. We want incoming - // 2. We have slots - // 3. We haven't failed the firewalled test - // - if (state->config.wantIncoming && - state->counts.inboundSlots() > 0) - { - Endpoint ep; - ep.hops = 0; - ep.address = beast::IP::Endpoint ( - beast::IP::AddressV4 ()).at_port ( - state->config.listeningPort); - for (auto& t : targets) - t.insert (ep); - } - - // build sequence of endpoints by hops - state->livecache.hops.shuffle(); - handout (targets.begin(), targets.end(), - state->livecache.hops.begin(), - state->livecache.hops.end()); - - // broadcast - for (auto const& t : targets) - { - SlotImp::ptr const& slot (t.slot()); - auto const& list (t.list()); - if (m_journal.trace) m_journal.trace << beast::leftw (18) << - "Logic sending " << slot->remote_endpoint() << - " with " << list.size() << - ((list.size() == 1) ? " endpoint" : " endpoints"); - m_callback.send (slot, list); - } - } - //-------------------------------------------------------------------------- // // PropertyStream @@ -1175,7 +1119,7 @@ public: void onWrite (beast::PropertyStream::Map& map) { - SharedState::Access state (m_state); + typename SharedState::Access state (m_state); // VFALCO NOTE These ugly casts are needed because // of how std::size_t is declared on some linuxes @@ -1217,12 +1161,12 @@ public: State const& state () const { - return *SharedState::ConstAccess (m_state); + return *typename SharedState::ConstAccess (m_state); } Counts const& counts () const { - return SharedState::ConstAccess (m_state)->counts; + return typename SharedState::ConstAccess (m_state)->counts; } static std::string stateString (Slot::State state) @@ -1245,19 +1189,3 @@ public: } #endif - -/* - -- recent tables entries should last equal to the cache time to live -- never send a slot a message thats in its recent table at a lower hop count -- when sending a message to a slot, add it to its recent table at one lower hop count - -Giveaway logic - -When do we give away? - -- To one inbound connection when we redirect due to full - -- To all slots at every broadcast - -*/ diff --git a/src/ripple/peerfinder/impl/Manager.cpp b/src/ripple/peerfinder/impl/Manager.cpp index bc34b419ac..84594c2911 100644 --- a/src/ripple/peerfinder/impl/Manager.cpp +++ b/src/ripple/peerfinder/impl/Manager.cpp @@ -18,11 +18,13 @@ //============================================================================== #include -#include +#include #include #include #include -#include +#include +#include +#include #if DOXYGEN #include @@ -33,50 +35,56 @@ namespace PeerFinder { class ManagerImp : public Manager - , public beast::Thread - , public SiteFiles::Listener - , public beast::DeadlineTimer::Listener , public beast::LeakChecked { public: - beast::ServiceQueue m_queue; - SiteFiles::Manager& m_siteFiles; beast::File m_databaseFile; clock_type& m_clock; beast::Journal m_journal; StoreSqdb m_store; - SerializedContext m_context; - CheckerAdapter m_checker; - Logic m_logic; - beast::DeadlineTimer m_secondsTimer; - + Checker checker_; + Logic m_logic; + + // Temporary + std::thread thread_; + boost::asio::io_service io_service_; + boost::optional work_; + //-------------------------------------------------------------------------- ManagerImp ( Stoppable& stoppable, - SiteFiles::Manager& siteFiles, beast::File const& pathToDbFileOrDirectory, - Callback& callback, clock_type& clock, beast::Journal journal) : Manager (stoppable) - , Thread ("PeerFinder") - , m_siteFiles (siteFiles) , m_databaseFile (pathToDbFileOrDirectory) , m_clock (clock) , m_journal (journal) , m_store (journal) - , m_checker (m_context, m_queue) - , m_logic (clock, callback, m_store, m_checker, journal) - , m_secondsTimer (this) + , checker_ (io_service_) + , m_logic (clock, m_store, checker_, journal) { if (m_databaseFile.isDirectory ()) m_databaseFile = m_databaseFile.getChildFile("peerfinder.sqlite"); + + work_ = boost::in_place (std::ref(io_service_)); + thread_ = std::thread ([&]() { this->io_service_.run(); }); } - ~ManagerImp () + ~ManagerImp() { - stopThread (); + stop(); + } + + void + stop() + { + if (thread_.joinable()) + { + work_ = boost::none; + thread_.join(); + } } //-------------------------------------------------------------------------- @@ -87,28 +95,20 @@ public: void setConfig (Config const& config) { - m_queue.dispatch ( - m_context.wrap ( - std::bind (&Logic::setConfig, &m_logic, - config))); + m_logic.config (config); } void addFixedPeer (std::string const& name, std::vector const& addresses) { - m_queue.dispatch ( - m_context.wrap ( - std::bind (&Logic::addFixedPeer, &m_logic, - name, addresses))); + m_logic.addFixedPeer (name, addresses); } - void addFallbackStrings (std::string const& name, + void + addFallbackStrings (std::string const& name, std::vector const& strings) { - m_queue.dispatch ( - m_context.wrap ( - std::bind (&Logic::addStaticSource, &m_logic, - SourceStrings::New (name, strings)))); + m_logic.addStaticSource (SourceStrings::New (name, strings)); } void addFallbackURL (std::string const& name, std::string const& url) @@ -118,107 +118,81 @@ public: //-------------------------------------------------------------------------- - Slot::ptr new_inbound_slot ( + Slot::ptr + new_inbound_slot ( beast::IP::Endpoint const& local_endpoint, beast::IP::Endpoint const& remote_endpoint) { return m_logic.new_inbound_slot (local_endpoint, remote_endpoint); } - Slot::ptr new_outbound_slot (beast::IP::Endpoint const& remote_endpoint) + Slot::ptr + new_outbound_slot (beast::IP::Endpoint const& remote_endpoint) { return m_logic.new_outbound_slot (remote_endpoint); } - void on_connected (Slot::ptr const& slot, - beast::IP::Endpoint const& local_endpoint) - { - SlotImp::ptr impl (std::dynamic_pointer_cast (slot)); - m_logic.on_connected (impl, local_endpoint); - } - - void on_handshake (Slot::ptr const& slot, - RipplePublicKey const& key, bool cluster) - { - SlotImp::ptr impl (std::dynamic_pointer_cast (slot)); - m_logic.on_handshake (impl, key, cluster); - } - - void on_endpoints (Slot::ptr const& slot, - Endpoints const& endpoints) + void + on_endpoints (Slot::ptr const& slot, Endpoints const& endpoints) { SlotImp::ptr impl (std::dynamic_pointer_cast (slot)); m_logic.on_endpoints (impl, endpoints); } - void on_legacy_endpoints (IPAddresses const& addresses) + void + on_legacy_endpoints (IPAddresses const& addresses) { m_logic.on_legacy_endpoints (addresses); } - void on_closed (Slot::ptr const& slot) + void + on_closed (Slot::ptr const& slot) { SlotImp::ptr impl (std::dynamic_pointer_cast (slot)); m_logic.on_closed (impl); } - void on_cancel (Slot::ptr const& slot) + //-------------------------------------------------------------------------- + + bool + connected (Slot::ptr const& slot, + beast::IP::Endpoint const& local_endpoint) override { SlotImp::ptr impl (std::dynamic_pointer_cast (slot)); - m_logic.on_cancel (impl); + return m_logic.connected (impl, local_endpoint); } - //-------------------------------------------------------------------------- - // - // SiteFiles - // - //-------------------------------------------------------------------------- - - void parseBootstrapIPs (std::string const& name, SiteFiles::Section const& section) + Result + activate (Slot::ptr const& slot, + RipplePublicKey const& key, bool cluster) override { - std::size_t n (0); - for (SiteFiles::Section::DataType::const_iterator iter ( - section.data().begin()); iter != section.data().end(); ++iter) - { - std::string const& s (*iter); - beast::IP::Endpoint addr (beast::IP::Endpoint::from_string (s)); - if (is_unspecified (addr)) - addr = beast::IP::Endpoint::from_string_altform(s); - if (! is_unspecified (addr)) - { - // add IP::Endpoint to bootstrap cache - ++n; - } - } - - m_journal.info << - "Added " << n << - " bootstrap IPs from " << name; + SlotImp::ptr impl (std::dynamic_pointer_cast (slot)); + return m_logic.activate (impl, key, cluster); } - void parseFixedIPs (SiteFiles::Section const& section) + std::vector + redirect (Slot::ptr const& slot) override { - for (SiteFiles::Section::DataType::const_iterator iter ( - section.data().begin()); iter != section.data().end(); ++iter) - { - std::string const& s (*iter); - beast::IP::Endpoint addr (beast::IP::Endpoint::from_string (s)); - if (is_unspecified (addr)) - addr = beast::IP::Endpoint::from_string_altform(s); - if (! is_unspecified (addr)) - { - // add IP::Endpoint to fixed peers - } - } + SlotImp::ptr impl (std::dynamic_pointer_cast (slot)); + return m_logic.redirect (impl); } - void onSiteFileFetch ( - std::string const& name, SiteFiles::SiteFile const& siteFile) + std::vector + autoconnect() override { - parseBootstrapIPs (name, siteFile["ips"]); + return m_logic.autoconnect(); + } - //if (name == "local") - // parseFixedIPs (name, siteFile["ips_fixed"]); + void + once_per_second() override + { + m_logic.once_per_second(); + } + + std::vector>> + sendpeers() override + { + return m_logic.sendpeers(); } //-------------------------------------------------------------------------- @@ -231,20 +205,28 @@ public: { } - void onStart () + void + onStart() { - startThread(); + m_journal.debug << "Initializing"; + beast::Error error (m_store.open (m_databaseFile)); + if (error) + m_journal.fatal << + "Failed to open '" << m_databaseFile.getFullPathName() << "'"; + if (! error) + m_logic.load (); } void onStop () { m_journal.debug << "Stopping"; - m_checker.cancel (); - m_logic.stop (); - m_secondsTimer.cancel(); - m_queue.dispatch ( - m_context.wrap ( - std::bind (&Thread::signalThreadShouldExit, this))); + checker_.stop(); + m_logic.stop(); + /* + signalThreadShouldExit(); + m_queue.dispatch (m_context.wrap ( + std::bind (&Thread::signalThreadShouldExit, this))); + */ } //-------------------------------------------------------------------------- @@ -255,62 +237,8 @@ public: void onWrite (beast::PropertyStream::Map& map) { - SerializedContext::Scope scope (m_context); - m_logic.onWrite (map); } - - //-------------------------------------------------------------------------- - - void onDeadlineTimer (beast::DeadlineTimer& timer) - { - if (timer == m_secondsTimer) - { - m_queue.dispatch ( - m_context.wrap ( - std::bind (&Logic::periodicActivity, &m_logic))); - - m_secondsTimer.setExpiration (Tuning::secondsPerConnect); - } - } - - void init () - { - m_journal.debug << "Initializing"; - - beast::Error error (m_store.open (m_databaseFile)); - - if (error) - { - m_journal.fatal << - "Failed to open '" << m_databaseFile.getFullPathName() << "'"; - } - - if (! error) - { - m_logic.load (); - } - - m_secondsTimer.setExpiration (std::chrono::seconds (1)); - } - - void run () - { - m_journal.debug << "Started"; - - init (); - - m_siteFiles.addListener (*this); - - while (! this->threadShouldExit()) - { - m_queue.run_one(); - } - - m_siteFiles.removeListener (*this); - - stopped(); - } }; //------------------------------------------------------------------------------ @@ -321,16 +249,10 @@ Manager::Manager (Stoppable& parent) { } -Manager* Manager::New ( - Stoppable& parent, - SiteFiles::Manager& siteFiles, - beast::File const& databaseFile, - Callback& callback, - clock_type& clock, - beast::Journal journal) +Manager* Manager::New (Stoppable& parent, beast::File const& databaseFile, + clock_type& clock, beast::Journal journal) { - return new ManagerImp (parent, siteFiles, databaseFile, - callback, clock, journal); + return new ManagerImp (parent, databaseFile, clock, journal); } } diff --git a/src/ripple/peerfinder/impl/Source.h b/src/ripple/peerfinder/impl/Source.h index e85cda965b..4354f90ff9 100644 --- a/src/ripple/peerfinder/impl/Source.h +++ b/src/ripple/peerfinder/impl/Source.h @@ -20,6 +20,7 @@ #ifndef RIPPLE_PEERFINDER_SOURCE_H_INCLUDED #define RIPPLE_PEERFINDER_SOURCE_H_INCLUDED +#include #include namespace ripple { diff --git a/src/ripple/peerfinder/impl/StoreSqdb.h b/src/ripple/peerfinder/impl/StoreSqdb.h index 640951d2c0..d13ad4dfeb 100644 --- a/src/ripple/peerfinder/impl/StoreSqdb.h +++ b/src/ripple/peerfinder/impl/StoreSqdb.h @@ -21,6 +21,7 @@ #define RIPPLE_PEERFINDER_STORESQDB_H_INCLUDED #include +#include namespace ripple { namespace PeerFinder { diff --git a/src/ripple/peerfinder/sim/Tests.cpp b/src/ripple/peerfinder/sim/Tests.cpp index d4389e0cc0..eb986d844b 100644 --- a/src/ripple/peerfinder/sim/Tests.cpp +++ b/src/ripple/peerfinder/sim/Tests.cpp @@ -499,7 +499,7 @@ public: { } - void async_test (IP::Endpoint const& address, + void async_connect (IP::Endpoint const& address, asio::shared_handler handler) { Node* const node (m_network.find (address)); diff --git a/src/ripple/sitefiles/api/Manager.h b/src/ripple/sitefiles/api/Manager.h index a6856e9175..fba3f420a7 100644 --- a/src/ripple/sitefiles/api/Manager.h +++ b/src/ripple/sitefiles/api/Manager.h @@ -43,7 +43,7 @@ public: /** Destroy the object. Any pending fetch operations are aborted. */ - virtual ~Manager () { } + virtual ~Manager() = default; /** Adds a listener. */ virtual void addListener (Listener& listener) = 0; diff --git a/src/ripple/unity/peerfinder.cpp b/src/ripple/unity/peerfinder.cpp index fb1d70b759..03393994ca 100644 --- a/src/ripple/unity/peerfinder.cpp +++ b/src/ripple/unity/peerfinder.cpp @@ -26,7 +26,6 @@ #endif #include -#include #include #include #include