From 35f9499b6778576bbbe000ced2f0bbe18325cb31 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Sun, 2 Nov 2014 10:00:36 -0800 Subject: [PATCH] Fix Overlay stop on exit: The stop sequence for Overlay had a race condition where autoconnect could be called after close_all, resulting in a hang on exit. This resolves the problem by putting the close and timer operations on a strand: * Rename some Overlay members * Put close on strand and tidy up members * Use completion handler instead of coroutine for timer * Use App io_service in PeerFinder --- Builds/VisualStudio2013/RippleD.vcxproj | 2 + .../VisualStudio2013/RippleD.vcxproj.filters | 3 + src/ripple/overlay/impl/OverlayImpl.cpp | 247 ++++++++++-------- src/ripple/overlay/impl/OverlayImpl.h | 98 ++++--- src/ripple/overlay/impl/PeerImp.cpp | 6 +- src/ripple/overlay/impl/PeerImp.h | 11 +- src/ripple/peerfinder/Manager.h | 5 - src/ripple/peerfinder/impl/Manager.cpp | 40 ++- src/ripple/peerfinder/make_Manager.h | 39 +++ 9 files changed, 263 insertions(+), 188 deletions(-) create mode 100644 src/ripple/peerfinder/make_Manager.h diff --git a/Builds/VisualStudio2013/RippleD.vcxproj b/Builds/VisualStudio2013/RippleD.vcxproj index a267e87dd..e617f09ff 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj +++ b/Builds/VisualStudio2013/RippleD.vcxproj @@ -2697,6 +2697,8 @@ + + diff --git a/Builds/VisualStudio2013/RippleD.vcxproj.filters b/Builds/VisualStudio2013/RippleD.vcxproj.filters index a8f920bf2..c67f11064 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2013/RippleD.vcxproj.filters @@ -3786,6 +3786,9 @@ ripple\peerfinder\impl + + ripple\peerfinder + ripple\peerfinder diff --git a/src/ripple/overlay/impl/OverlayImpl.cpp b/src/ripple/overlay/impl/OverlayImpl.cpp index 7f690bc0e..586a3985f 100644 --- a/src/ripple/overlay/impl/OverlayImpl.cpp +++ b/src/ripple/overlay/impl/OverlayImpl.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #if DOXYGEN @@ -52,6 +53,64 @@ struct get_peer_json //------------------------------------------------------------------------------ +OverlayImpl::Child::Child (OverlayImpl& overlay) + : overlay_(overlay) +{ +} + +OverlayImpl::Child::~Child() +{ + overlay_.remove(*this); +} + +//------------------------------------------------------------------------------ + +OverlayImpl::Timer::Timer (OverlayImpl& overlay) + : Child(overlay) + , timer_(overlay_.io_service_) +{ +} + +void +OverlayImpl::Timer::close() +{ + error_code ec; + timer_.cancel(ec); +} + +void +OverlayImpl::Timer::run() +{ + error_code ec; + timer_.expires_from_now (std::chrono::seconds(1)); + timer_.async_wait(overlay_.strand_.wrap( + std::bind(&Timer::on_timer, shared_from_this(), + beast::asio::placeholders::error))); +} + +void +OverlayImpl::Timer::on_timer (error_code ec) +{ + if (ec || overlay_.isStopping()) + { + if (ec && ec != boost::asio::error::operation_aborted) + if (overlay_.journal_.error) overlay_.journal_.error << + "on_timer: " << ec.message(); + return; + } + + overlay_.m_peerFinder->once_per_second(); + overlay_.sendpeers(); + overlay_.autoconnect(); + + timer_.expires_from_now (std::chrono::seconds(1)); + timer_.async_wait(overlay_.strand_.wrap(std::bind( + &Timer::on_timer, shared_from_this(), + beast::asio::placeholders::error))); +} + +//------------------------------------------------------------------------------ + OverlayImpl::OverlayImpl ( Setup const& setup, Stoppable& parent, @@ -61,28 +120,30 @@ OverlayImpl::OverlayImpl ( Resolver& resolver, boost::asio::io_service& io_service) : Overlay (parent) + , io_service_ (io_service) + , work_ (boost::in_place(std::ref(io_service_))) + , strand_ (io_service_) , setup_(setup) - , m_child_count (1) - , m_journal (deprecatedLogs().journal("Overlay")) + , journal_ (deprecatedLogs().journal("Overlay")) , m_resourceManager (resourceManager) - , m_peerFinder (add (PeerFinder::Manager::New (*this, - pathToDbFileOrDirectory, get_seconds_clock (), - deprecatedLogs().journal("PeerFinder")))) - , m_io_service (io_service) - , timer_(io_service) + , m_peerFinder (PeerFinder::make_Manager (*this, io_service, + pathToDbFileOrDirectory, get_seconds_clock(), + deprecatedLogs().journal("PeerFinder"))) , m_resolver (resolver) , m_nextShortId (0) { + beast::PropertyStream::Source::add (m_peerFinder.get()); } OverlayImpl::~OverlayImpl () { + close(); + // Block until dependent objects have been destroyed. // This is just to catch improper use of the Stoppable API. // - std::unique_lock lock (m_mutex); - m_cond.wait (lock, [this] { - return this->m_child_count == 0; }); + std::unique_lock lock (mutex_); + cond_.wait (lock, [this] { return list_.empty(); }); } void @@ -114,31 +175,25 @@ OverlayImpl::accept (socket_type&& socket) *m_peerFinder, slot, setup_.context)); { - std::lock_guard lock (m_mutex); + std::lock_guard lock (mutex_); { std::pair const result ( m_peers.emplace (slot, peer)); assert (result.second); (void) result.second; } - ++m_child_count; + list_.emplace(peer.get(), peer); // This has to happen while holding the lock, // otherwise the socket might not be canceled during a stop. - peer->start (); + peer->start(); } } void OverlayImpl::connect (beast::IP::Endpoint const& remote_endpoint) { - if (isStopping()) - { - m_journal.debug << - "Skipping " << remote_endpoint << - " connect on stop"; - return; - } + assert(work_); PeerFinder::Slot::ptr const slot ( m_peerFinder->new_outbound_slot (remote_endpoint)); @@ -147,18 +202,18 @@ OverlayImpl::connect (beast::IP::Endpoint const& remote_endpoint) return; PeerImp::ptr const peer (std::make_shared ( - remote_endpoint, m_io_service, *this, m_resourceManager, + remote_endpoint, io_service_, *this, m_resourceManager, *m_peerFinder, slot, setup_.context)); { - std::lock_guard lock (m_mutex); + std::lock_guard lock (mutex_); { std::pair const result ( m_peers.emplace (slot, peer)); assert (result.second); (void) result.second; } - ++m_child_count; + list_.emplace(peer.get(), peer); // This has to happen while holding the lock, // otherwise the socket might not be canceled during a stop. @@ -174,41 +229,13 @@ OverlayImpl::next_id() //-------------------------------------------------------------------------- -// Check for the stopped condition -// Caller must hold the mutex -void -OverlayImpl::check_stopped () -{ - // To be stopped, child Stoppable objects must be stopped - // and the count of dependent objects must be zero - if (areChildrenStopped () && m_child_count == 0) - { - m_cond.notify_all (); - m_journal.info << - "Stopped."; - stopped (); - } -} - -// Decrement the count of dependent objects -// Caller must hold the mutex -void -OverlayImpl::release () -{ - if (--m_child_count == 0) - check_stopped (); -} - void OverlayImpl::remove (PeerFinder::Slot::ptr const& slot) { - std::lock_guard lock (m_mutex); - + std::lock_guard lock (mutex_); PeersBySlot::iterator const iter (m_peers.find (slot)); assert (iter != m_peers.end ()); m_peers.erase (iter); - - release(); } //-------------------------------------------------------------------------- @@ -290,62 +317,36 @@ OverlayImpl::onPrepare () if (! getConfig ().RUN_STANDALONE) { m_doorDirect = make_PeerDoor (*this, getConfig ().PEER_IP, - getConfig ().peerListeningPort, m_io_service); + getConfig ().peerListeningPort, io_service_); } } void OverlayImpl::onStart () { - // mutex not needed since we aren't running - ++m_child_count; - boost::asio::spawn (m_io_service, std::bind ( - &OverlayImpl::do_timer, this, std::placeholders::_1)); -} - -/** Close all peer connections. - If `graceful` is true then active - Requirements: - Caller must hold the mutex. -*/ -void -OverlayImpl::close_all (bool graceful) -{ - for (auto const& entry : m_peers) - { - PeerImp::ptr const peer (entry.second.lock()); - - // VFALCO The only case where the weak_ptr is expired should be if - // ~PeerImp is pre-empted before it calls m_peers.remove() - // - if (peer != nullptr) - peer->close(); - } + auto const timer = std::make_shared(*this); + std::lock_guard lock (mutex_); + list_.emplace(timer.get(), timer); + timer_ = timer; + timer->run(); } void OverlayImpl::onStop () { - error_code ec; - timer_.cancel(ec); - if (m_doorDirect) m_doorDirect->stop(); if (m_doorProxy) m_doorProxy->stop(); - std::lock_guard lock (m_mutex); - // Take off the extra count we added in the constructor - release(); - - close_all (false); + strand_.dispatch(std::bind(&OverlayImpl::close, this)); } void OverlayImpl::onChildrenStopped () { - std::lock_guard lock (m_mutex); - check_stopped (); + std::lock_guard lock (mutex_); + checkStopped (); } //-------------------------------------------------------------------------- @@ -368,7 +369,7 @@ OverlayImpl::onWrite (beast::PropertyStream::Map& stream) void OverlayImpl::activate (Peer::ptr const& peer) { - std::lock_guard lock (m_mutex); + std::lock_guard lock (mutex_); // Now track this peer { @@ -389,7 +390,7 @@ OverlayImpl::activate (Peer::ptr const& peer) (void) result.second; } - m_journal.debug << + journal_.debug << "activated " << peer->getRemoteAddress() << " (" << peer->getShortId() << ":" << RipplePublicKey(peer->getNodePublic()) << ")"; @@ -406,7 +407,7 @@ OverlayImpl::activate (Peer::ptr const& peer) void OverlayImpl::onPeerDisconnect (Peer::ptr const& peer) { - std::lock_guard lock (m_mutex); + std::lock_guard lock (mutex_); m_shortIdMap.erase (peer->getShortId ()); m_publicKeyMap.erase (peer->getNodePublic ()); } @@ -416,9 +417,9 @@ OverlayImpl::onPeerDisconnect (Peer::ptr const& peer) and are running the Ripple protocol. */ std::size_t -OverlayImpl::size () +OverlayImpl::size() { - std::lock_guard lock (m_mutex); + std::lock_guard lock (mutex_); return m_publicKeyMap.size (); } @@ -434,7 +435,7 @@ OverlayImpl::getActivePeers () { Overlay::PeerSequence ret; - std::lock_guard lock (m_mutex); + std::lock_guard lock (mutex_); ret.reserve (m_publicKeyMap.size ()); @@ -450,7 +451,7 @@ OverlayImpl::getActivePeers () Peer::ptr OverlayImpl::findPeerByShortID (Peer::ShortId const& id) { - std::lock_guard lock (m_mutex); + std::lock_guard lock (mutex_); PeerByShortId::iterator const iter ( m_shortIdMap.find (id)); if (iter != m_shortIdMap.end ()) @@ -461,11 +462,38 @@ OverlayImpl::findPeerByShortID (Peer::ShortId const& id) //------------------------------------------------------------------------------ void -OverlayImpl::autoconnect() +OverlayImpl::remove (Child& child) { - auto const result = m_peerFinder->autoconnect(); - for (auto addr : result) - connect (addr); + std::lock_guard lock(mutex_); + list_.erase(&child); + if (list_.empty()) + checkStopped(); +} + +void +OverlayImpl::close() +{ + std::lock_guard lock(mutex_); + if (work_) + { + work_ = boost::none; + for (auto& _ : list_) + { + auto const child = _.second.lock(); + // Happens when the child is about to be destroyed + if (child != nullptr) + child->close(); + } + } +} + +// Check for the stopped condition +// Caller must hold the mutex +void +OverlayImpl::checkStopped () +{ + if (isStopping() && areChildrenStopped () && list_.empty()) + stopped(); } void @@ -477,7 +505,7 @@ OverlayImpl::sendpeers() // VFALCO TODO Make sure this doesn't race with closing the peer PeerImp::ptr peer; { - std::lock_guard lock (m_mutex); + std::lock_guard lock (mutex_); PeersBySlot::iterator const iter = m_peers.find (e.first); if (iter != m_peers.end()) peer = iter->second.lock(); @@ -488,24 +516,11 @@ OverlayImpl::sendpeers() } void -OverlayImpl::do_timer (yield_context yield) +OverlayImpl::autoconnect() { - for(;;) - { - m_peerFinder->once_per_second(); - sendpeers(); - autoconnect(); - - timer_.expires_from_now (std::chrono::seconds(1)); - error_code ec; - timer_.async_wait (yield[ec]); - if (ec == boost::asio::error::operation_aborted) - break; - } - - // Take off a reference - std::lock_guard lock (m_mutex); - release(); + auto const result = m_peerFinder->autoconnect(); + for (auto addr : result) + connect (addr); } //------------------------------------------------------------------------------ diff --git a/src/ripple/overlay/impl/OverlayImpl.h b/src/ripple/overlay/impl/OverlayImpl.h index 4e5968606..adcf8f781 100644 --- a/src/ripple/overlay/impl/OverlayImpl.h +++ b/src/ripple/overlay/impl/OverlayImpl.h @@ -31,6 +31,7 @@ #include #include #include +#include #include // #include #include @@ -46,38 +47,69 @@ class PeerImp; class OverlayImpl : public Overlay { +public: + class Child + { + protected: + OverlayImpl& overlay_; + + explicit + Child (OverlayImpl& overlay); + + virtual ~Child(); + + public: + virtual void close() = 0; + }; + private: using clock_type = std::chrono::steady_clock; using socket_type = boost::asio::ip::tcp::socket; using error_code = boost::system::error_code; using yield_context = boost::asio::yield_context; - typedef hash_map > PeersBySlot; + using PeersBySlot = hash_map >; - typedef hash_map PeerByPublicKey; + using PeerByPublicKey = hash_map; - typedef hash_map PeerByShortId; + using PeerByShortId = hash_map; + + struct Timer + : Child + , std::enable_shared_from_this + { + boost::asio::basic_waitable_timer timer_; + + explicit + Timer (OverlayImpl& overlay); + + void + close() override; + + void + run(); + + void + on_timer (error_code ec); + }; + + boost::asio::io_service& io_service_; + boost::optional work_; + boost::asio::io_service::strand strand_; + + std::recursive_mutex mutex_; // VFALCO use std::mutex + std::condition_variable_any cond_; + std::weak_ptr timer_; + boost::container::flat_map< + Child*, std::weak_ptr> list_; Setup setup_; - - // VFALCO TODO Change to regular mutex and eliminate re-entrancy - std::recursive_mutex m_mutex; - - // Blocks us until dependent objects have been destroyed - std::condition_variable_any m_cond; - - // Number of dependencies that must be destroyed before we can stop - std::size_t m_child_count; - - beast::Journal m_journal; + beast::Journal journal_; Resource::Manager& m_resourceManager; std::unique_ptr m_peerFinder; - boost::asio::io_service& m_io_service; - boost::asio::basic_waitable_timer timer_; - /** Associates slots to peers. */ PeersBySlot m_peers; @@ -165,32 +197,21 @@ private: //-------------------------------------------------------------------------- - void - check_stopped (); - // // Stoppable // void - onPrepare () override; + onPrepare() override; void - onStart () override; - - /** Close all peer connections. - If `graceful` is true then active - Requirements: - Caller must hold the mutex. - */ - void - close_all (bool graceful); + onStart() override; void - onStop () override; + onStop() override; void - onChildrenStopped () override; + onChildrenStopped() override; // // PropertyStream @@ -202,16 +223,19 @@ private: //-------------------------------------------------------------------------- void - release(); + remove (Child& child); + + void + close(); + + void + checkStopped (); void sendpeers(); void autoconnect(); - - void - do_timer (yield_context yield); }; } // ripple diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index d98714638..1da752760 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -30,7 +30,8 @@ PeerImp::PeerImp (socket_type&& socket, beast::IP::Endpoint remoteAddress, OverlayImpl& overlay, Resource::Manager& resourceManager, PeerFinder::Manager& peerFinder, PeerFinder::Slot::ptr const& slot, std::shared_ptr const& context) - : journal_ (deprecatedLogs().journal("Peer")) + : Child (overlay) + , journal_ (deprecatedLogs().journal("Peer")) , ssl_bundle_(std::make_unique( context, std::move(socket))) , socket_ (ssl_bundle_->socket) @@ -53,7 +54,8 @@ PeerImp::PeerImp (beast::IP::Endpoint remoteAddress, Resource::Manager& resourceManager, PeerFinder::Manager& peerFinder, PeerFinder::Slot::ptr const& slot, std::shared_ptr const& context) - : journal_ (deprecatedLogs().journal("Peer")) + : Child (overlay) + , journal_ (deprecatedLogs().journal("Peer")) , ssl_bundle_(std::make_unique( context, io_service)) , socket_ (ssl_bundle_->socket) diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index 63864faea..dd08df9bf 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -62,6 +62,7 @@ std::ostream& operator<< (std::ostream& os, PeerImp const* peer); class PeerImp : public Peer , public std::enable_shared_from_this + , public OverlayImpl::Child , private beast::LeakChecked , private abstract_protocol_handler { @@ -202,15 +203,15 @@ public: // Begin asynchronous initiation function calls void - start (); - - // Cancel all I/O and close the socket - void - close(); + start(); void getLedger (protocol::TMGetLedger& packet); + // Cancel all I/O and close the socket + void + close() override; + // // Network // diff --git a/src/ripple/peerfinder/Manager.h b/src/ripple/peerfinder/Manager.h index f900e0fd7..50a8de7db 100644 --- a/src/ripple/peerfinder/Manager.h +++ b/src/ripple/peerfinder/Manager.h @@ -121,11 +121,6 @@ protected: explicit Manager (Stoppable& parent); public: - /** Create a new Manager. */ - static Manager* New (Stoppable& parent, - beast::File const& pathToDbFileOrDirectory, - clock_type& clock, beast::Journal journal); - /** Destroy the object. Any pending source fetch operations are aborted. There may be some listener calls made before the diff --git a/src/ripple/peerfinder/impl/Manager.cpp b/src/ripple/peerfinder/impl/Manager.cpp index 23c2a695e..a6ddb8509 100644 --- a/src/ripple/peerfinder/impl/Manager.cpp +++ b/src/ripple/peerfinder/impl/Manager.cpp @@ -24,6 +24,7 @@ #include #include #include +#include // #include #if DOXYGEN @@ -38,6 +39,8 @@ class ManagerImp , public beast::LeakChecked { public: + boost::asio::io_service &io_service_; + boost::optional work_; beast::File m_databaseFile; clock_type& m_clock; beast::Journal m_journal; @@ -45,19 +48,17 @@ public: Checker checker_; Logic m_logic; - // Temporary - std::thread thread_; - boost::asio::io_service io_service_; - boost::optional work_; - //-------------------------------------------------------------------------- ManagerImp ( Stoppable& stoppable, + boost::asio::io_service& io_service, beast::File const& pathToDbFileOrDirectory, clock_type& clock, beast::Journal journal) : Manager (stoppable) + , io_service_(io_service) + , work_(boost::in_place(std::ref(io_service_))) , m_databaseFile (pathToDbFileOrDirectory) , m_clock (clock) , m_journal (journal) @@ -67,23 +68,21 @@ public: { if (m_databaseFile.isDirectory ()) m_databaseFile = m_databaseFile.getChildFile("peerfinder.sqlite"); - - work_ = boost::in_place (std::ref(io_service_)); - thread_ = std::thread ([&]() { this->io_service_.run(); }); } ~ManagerImp() { - stop(); + close(); } void - stop() + close() { - if (thread_.joinable()) + if (work_) { work_ = boost::none; - thread_.join(); + checker_.stop(); + m_logic.stop(); } } @@ -228,15 +227,8 @@ public: void onStop () { - m_journal.debug << "Stopping"; - checker_.stop(); - m_logic.stop(); + close(); stopped(); - /* - signalThreadShouldExit(); - m_queue.dispatch (m_context.wrap ( - std::bind (&Thread::signalThreadShouldExit, this))); - */ } //-------------------------------------------------------------------------- @@ -259,10 +251,12 @@ Manager::Manager (Stoppable& parent) { } -Manager* Manager::New (Stoppable& parent, beast::File const& databaseFile, - clock_type& clock, beast::Journal journal) +std::unique_ptr +make_Manager (beast::Stoppable& parent, boost::asio::io_service& io_service, + beast::File const& databaseFile, clock_type& clock, beast::Journal journal) { - return new ManagerImp (parent, databaseFile, clock, journal); + return std::make_unique ( + parent, io_service, databaseFile, clock, journal); } } diff --git a/src/ripple/peerfinder/make_Manager.h b/src/ripple/peerfinder/make_Manager.h new file mode 100644 index 000000000..aaf6d507f --- /dev/null +++ b/src/ripple/peerfinder/make_Manager.h @@ -0,0 +1,39 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_PEERFINDER_MAKE_MANAGER_H_INCLUDED +#define RIPPLE_PEERFINDER_MAKE_MANAGER_H_INCLUDED + +#include +#include +#include + +namespace ripple { +namespace PeerFinder { + +/** Create a new Manager. */ +std::unique_ptr +make_Manager (beast::Stoppable& parent, boost::asio::io_service& io_service, + beast::File const& pathToDbFileOrDirectory, + clock_type& clock, beast::Journal journal); + +} +} + +#endif