Fix Overlay stop on exit:

The stop sequence for Overlay had a race condition where autoconnect could
be called after close_all, resulting in a hang on exit. This resolves the
problem by putting the close and timer operations on a strand:
* Rename some Overlay members
* Put close on strand and tidy up members
* Use completion handler instead of coroutine for timer
* Use App io_service in PeerFinder
This commit is contained in:
Vinnie Falco
2014-11-02 10:00:36 -08:00
parent db82c35c17
commit 35f9499b67
9 changed files with 263 additions and 188 deletions

View File

@@ -2697,6 +2697,8 @@
</ClInclude>
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Tuning.h">
</ClInclude>
<ClInclude Include="..\..\src\ripple\peerfinder\make_Manager.h">
</ClInclude>
<ClInclude Include="..\..\src\ripple\peerfinder\Manager.h">
</ClInclude>
<None Include="..\..\src\ripple\peerfinder\README.md">

View File

@@ -3786,6 +3786,9 @@
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Tuning.h">
<Filter>ripple\peerfinder\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\peerfinder\make_Manager.h">
<Filter>ripple\peerfinder</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\peerfinder\Manager.h">
<Filter>ripple\peerfinder</Filter>
</ClInclude>

View File

@@ -21,6 +21,7 @@
#include <ripple/overlay/impl/OverlayImpl.h>
#include <ripple/overlay/impl/PeerDoor.h>
#include <ripple/overlay/impl/PeerImp.h>
#include <ripple/peerfinder/make_Manager.h>
#include <beast/ByteOrder.h>
#if DOXYGEN
@@ -52,6 +53,64 @@ struct get_peer_json
//------------------------------------------------------------------------------
OverlayImpl::Child::Child (OverlayImpl& overlay)
: overlay_(overlay)
{
}
OverlayImpl::Child::~Child()
{
overlay_.remove(*this);
}
//------------------------------------------------------------------------------
OverlayImpl::Timer::Timer (OverlayImpl& overlay)
: Child(overlay)
, timer_(overlay_.io_service_)
{
}
void
OverlayImpl::Timer::close()
{
error_code ec;
timer_.cancel(ec);
}
void
OverlayImpl::Timer::run()
{
error_code ec;
timer_.expires_from_now (std::chrono::seconds(1));
timer_.async_wait(overlay_.strand_.wrap(
std::bind(&Timer::on_timer, shared_from_this(),
beast::asio::placeholders::error)));
}
void
OverlayImpl::Timer::on_timer (error_code ec)
{
if (ec || overlay_.isStopping())
{
if (ec && ec != boost::asio::error::operation_aborted)
if (overlay_.journal_.error) overlay_.journal_.error <<
"on_timer: " << ec.message();
return;
}
overlay_.m_peerFinder->once_per_second();
overlay_.sendpeers();
overlay_.autoconnect();
timer_.expires_from_now (std::chrono::seconds(1));
timer_.async_wait(overlay_.strand_.wrap(std::bind(
&Timer::on_timer, shared_from_this(),
beast::asio::placeholders::error)));
}
//------------------------------------------------------------------------------
OverlayImpl::OverlayImpl (
Setup const& setup,
Stoppable& parent,
@@ -61,28 +120,30 @@ OverlayImpl::OverlayImpl (
Resolver& resolver,
boost::asio::io_service& io_service)
: Overlay (parent)
, io_service_ (io_service)
, work_ (boost::in_place(std::ref(io_service_)))
, strand_ (io_service_)
, setup_(setup)
, m_child_count (1)
, m_journal (deprecatedLogs().journal("Overlay"))
, journal_ (deprecatedLogs().journal("Overlay"))
, m_resourceManager (resourceManager)
, m_peerFinder (add (PeerFinder::Manager::New (*this,
pathToDbFileOrDirectory, get_seconds_clock (),
deprecatedLogs().journal("PeerFinder"))))
, m_io_service (io_service)
, timer_(io_service)
, m_peerFinder (PeerFinder::make_Manager (*this, io_service,
pathToDbFileOrDirectory, get_seconds_clock(),
deprecatedLogs().journal("PeerFinder")))
, m_resolver (resolver)
, m_nextShortId (0)
{
beast::PropertyStream::Source::add (m_peerFinder.get());
}
OverlayImpl::~OverlayImpl ()
{
close();
// Block until dependent objects have been destroyed.
// This is just to catch improper use of the Stoppable API.
//
std::unique_lock <decltype(m_mutex)> lock (m_mutex);
m_cond.wait (lock, [this] {
return this->m_child_count == 0; });
std::unique_lock <decltype(mutex_)> lock (mutex_);
cond_.wait (lock, [this] { return list_.empty(); });
}
void
@@ -114,31 +175,25 @@ OverlayImpl::accept (socket_type&& socket)
*m_peerFinder, slot, setup_.context));
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
std::lock_guard <decltype(mutex_)> lock (mutex_);
{
std::pair <PeersBySlot::iterator, bool> const result (
m_peers.emplace (slot, peer));
assert (result.second);
(void) result.second;
}
++m_child_count;
list_.emplace(peer.get(), peer);
// This has to happen while holding the lock,
// otherwise the socket might not be canceled during a stop.
peer->start ();
peer->start();
}
}
void
OverlayImpl::connect (beast::IP::Endpoint const& remote_endpoint)
{
if (isStopping())
{
m_journal.debug <<
"Skipping " << remote_endpoint <<
" connect on stop";
return;
}
assert(work_);
PeerFinder::Slot::ptr const slot (
m_peerFinder->new_outbound_slot (remote_endpoint));
@@ -147,18 +202,18 @@ OverlayImpl::connect (beast::IP::Endpoint const& remote_endpoint)
return;
PeerImp::ptr const peer (std::make_shared <PeerImp> (
remote_endpoint, m_io_service, *this, m_resourceManager,
remote_endpoint, io_service_, *this, m_resourceManager,
*m_peerFinder, slot, setup_.context));
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
std::lock_guard <decltype(mutex_)> lock (mutex_);
{
std::pair <PeersBySlot::iterator, bool> const result (
m_peers.emplace (slot, peer));
assert (result.second);
(void) result.second;
}
++m_child_count;
list_.emplace(peer.get(), peer);
// This has to happen while holding the lock,
// otherwise the socket might not be canceled during a stop.
@@ -174,41 +229,13 @@ OverlayImpl::next_id()
//--------------------------------------------------------------------------
// Check for the stopped condition
// Caller must hold the mutex
void
OverlayImpl::check_stopped ()
{
// To be stopped, child Stoppable objects must be stopped
// and the count of dependent objects must be zero
if (areChildrenStopped () && m_child_count == 0)
{
m_cond.notify_all ();
m_journal.info <<
"Stopped.";
stopped ();
}
}
// Decrement the count of dependent objects
// Caller must hold the mutex
void
OverlayImpl::release ()
{
if (--m_child_count == 0)
check_stopped ();
}
void
OverlayImpl::remove (PeerFinder::Slot::ptr const& slot)
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
std::lock_guard <decltype(mutex_)> lock (mutex_);
PeersBySlot::iterator const iter (m_peers.find (slot));
assert (iter != m_peers.end ());
m_peers.erase (iter);
release();
}
//--------------------------------------------------------------------------
@@ -290,62 +317,36 @@ OverlayImpl::onPrepare ()
if (! getConfig ().RUN_STANDALONE)
{
m_doorDirect = make_PeerDoor (*this, getConfig ().PEER_IP,
getConfig ().peerListeningPort, m_io_service);
getConfig ().peerListeningPort, io_service_);
}
}
void
OverlayImpl::onStart ()
{
// mutex not needed since we aren't running
++m_child_count;
boost::asio::spawn (m_io_service, std::bind (
&OverlayImpl::do_timer, this, std::placeholders::_1));
}
/** Close all peer connections.
If `graceful` is true then active
Requirements:
Caller must hold the mutex.
*/
void
OverlayImpl::close_all (bool graceful)
{
for (auto const& entry : m_peers)
{
PeerImp::ptr const peer (entry.second.lock());
// VFALCO The only case where the weak_ptr is expired should be if
// ~PeerImp is pre-empted before it calls m_peers.remove()
//
if (peer != nullptr)
peer->close();
}
auto const timer = std::make_shared<Timer>(*this);
std::lock_guard <decltype(mutex_)> lock (mutex_);
list_.emplace(timer.get(), timer);
timer_ = timer;
timer->run();
}
void
OverlayImpl::onStop ()
{
error_code ec;
timer_.cancel(ec);
if (m_doorDirect)
m_doorDirect->stop();
if (m_doorProxy)
m_doorProxy->stop();
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
// Take off the extra count we added in the constructor
release();
close_all (false);
strand_.dispatch(std::bind(&OverlayImpl::close, this));
}
void
OverlayImpl::onChildrenStopped ()
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
check_stopped ();
std::lock_guard <decltype(mutex_)> lock (mutex_);
checkStopped ();
}
//--------------------------------------------------------------------------
@@ -368,7 +369,7 @@ OverlayImpl::onWrite (beast::PropertyStream::Map& stream)
void
OverlayImpl::activate (Peer::ptr const& peer)
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
std::lock_guard <decltype(mutex_)> lock (mutex_);
// Now track this peer
{
@@ -389,7 +390,7 @@ OverlayImpl::activate (Peer::ptr const& peer)
(void) result.second;
}
m_journal.debug <<
journal_.debug <<
"activated " << peer->getRemoteAddress() <<
" (" << peer->getShortId() <<
":" << RipplePublicKey(peer->getNodePublic()) << ")";
@@ -406,7 +407,7 @@ OverlayImpl::activate (Peer::ptr const& peer)
void
OverlayImpl::onPeerDisconnect (Peer::ptr const& peer)
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
std::lock_guard <decltype(mutex_)> lock (mutex_);
m_shortIdMap.erase (peer->getShortId ());
m_publicKeyMap.erase (peer->getNodePublic ());
}
@@ -416,9 +417,9 @@ OverlayImpl::onPeerDisconnect (Peer::ptr const& peer)
and are running the Ripple protocol.
*/
std::size_t
OverlayImpl::size ()
OverlayImpl::size()
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
std::lock_guard <decltype(mutex_)> lock (mutex_);
return m_publicKeyMap.size ();
}
@@ -434,7 +435,7 @@ OverlayImpl::getActivePeers ()
{
Overlay::PeerSequence ret;
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
std::lock_guard <decltype(mutex_)> lock (mutex_);
ret.reserve (m_publicKeyMap.size ());
@@ -450,7 +451,7 @@ OverlayImpl::getActivePeers ()
Peer::ptr
OverlayImpl::findPeerByShortID (Peer::ShortId const& id)
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
std::lock_guard <decltype(mutex_)> lock (mutex_);
PeerByShortId::iterator const iter (
m_shortIdMap.find (id));
if (iter != m_shortIdMap.end ())
@@ -461,11 +462,38 @@ OverlayImpl::findPeerByShortID (Peer::ShortId const& id)
//------------------------------------------------------------------------------
void
OverlayImpl::autoconnect()
OverlayImpl::remove (Child& child)
{
auto const result = m_peerFinder->autoconnect();
for (auto addr : result)
connect (addr);
std::lock_guard<decltype(mutex_)> lock(mutex_);
list_.erase(&child);
if (list_.empty())
checkStopped();
}
void
OverlayImpl::close()
{
std::lock_guard<decltype(mutex_)> lock(mutex_);
if (work_)
{
work_ = boost::none;
for (auto& _ : list_)
{
auto const child = _.second.lock();
// Happens when the child is about to be destroyed
if (child != nullptr)
child->close();
}
}
}
// Check for the stopped condition
// Caller must hold the mutex
void
OverlayImpl::checkStopped ()
{
if (isStopping() && areChildrenStopped () && list_.empty())
stopped();
}
void
@@ -477,7 +505,7 @@ OverlayImpl::sendpeers()
// VFALCO TODO Make sure this doesn't race with closing the peer
PeerImp::ptr peer;
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
std::lock_guard <decltype(mutex_)> lock (mutex_);
PeersBySlot::iterator const iter = m_peers.find (e.first);
if (iter != m_peers.end())
peer = iter->second.lock();
@@ -488,24 +516,11 @@ OverlayImpl::sendpeers()
}
void
OverlayImpl::do_timer (yield_context yield)
OverlayImpl::autoconnect()
{
for(;;)
{
m_peerFinder->once_per_second();
sendpeers();
autoconnect();
timer_.expires_from_now (std::chrono::seconds(1));
error_code ec;
timer_.async_wait (yield[ec]);
if (ec == boost::asio::error::operation_aborted)
break;
}
// Take off a reference
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
release();
auto const result = m_peerFinder->autoconnect();
for (auto addr : result)
connect (addr);
}
//------------------------------------------------------------------------------

View File

@@ -31,6 +31,7 @@
#include <boost/asio/ssl/context.hpp>
#include <boost/asio/basic_waitable_timer.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/container/flat_map.hpp>
#include <beast/cxx14/memory.h> // <memory>
#include <atomic>
#include <cassert>
@@ -46,38 +47,69 @@ class PeerImp;
class OverlayImpl : public Overlay
{
public:
class Child
{
protected:
OverlayImpl& overlay_;
explicit
Child (OverlayImpl& overlay);
virtual ~Child();
public:
virtual void close() = 0;
};
private:
using clock_type = std::chrono::steady_clock;
using socket_type = boost::asio::ip::tcp::socket;
using error_code = boost::system::error_code;
using yield_context = boost::asio::yield_context;
typedef hash_map <PeerFinder::Slot::ptr,
std::weak_ptr <PeerImp>> PeersBySlot;
using PeersBySlot = hash_map <PeerFinder::Slot::ptr,
std::weak_ptr <PeerImp>>;
typedef hash_map <RippleAddress, Peer::ptr> PeerByPublicKey;
using PeerByPublicKey = hash_map<RippleAddress, Peer::ptr>;
typedef hash_map <Peer::ShortId, Peer::ptr> PeerByShortId;
using PeerByShortId = hash_map<Peer::ShortId, Peer::ptr>;
struct Timer
: Child
, std::enable_shared_from_this<Timer>
{
boost::asio::basic_waitable_timer <clock_type> timer_;
explicit
Timer (OverlayImpl& overlay);
void
close() override;
void
run();
void
on_timer (error_code ec);
};
boost::asio::io_service& io_service_;
boost::optional<boost::asio::io_service::work> work_;
boost::asio::io_service::strand strand_;
std::recursive_mutex mutex_; // VFALCO use std::mutex
std::condition_variable_any cond_;
std::weak_ptr<Timer> timer_;
boost::container::flat_map<
Child*, std::weak_ptr<Child>> list_;
Setup setup_;
// VFALCO TODO Change to regular mutex and eliminate re-entrancy
std::recursive_mutex m_mutex;
// Blocks us until dependent objects have been destroyed
std::condition_variable_any m_cond;
// Number of dependencies that must be destroyed before we can stop
std::size_t m_child_count;
beast::Journal m_journal;
beast::Journal journal_;
Resource::Manager& m_resourceManager;
std::unique_ptr <PeerFinder::Manager> m_peerFinder;
boost::asio::io_service& m_io_service;
boost::asio::basic_waitable_timer <clock_type> timer_;
/** Associates slots to peers. */
PeersBySlot m_peers;
@@ -165,32 +197,21 @@ private:
//--------------------------------------------------------------------------
void
check_stopped ();
//
// Stoppable
//
void
onPrepare () override;
onPrepare() override;
void
onStart () override;
/** Close all peer connections.
If `graceful` is true then active
Requirements:
Caller must hold the mutex.
*/
void
close_all (bool graceful);
onStart() override;
void
onStop () override;
onStop() override;
void
onChildrenStopped () override;
onChildrenStopped() override;
//
// PropertyStream
@@ -202,16 +223,19 @@ private:
//--------------------------------------------------------------------------
void
release();
remove (Child& child);
void
close();
void
checkStopped ();
void
sendpeers();
void
autoconnect();
void
do_timer (yield_context yield);
};
} // ripple

View File

@@ -30,7 +30,8 @@ PeerImp::PeerImp (socket_type&& socket, beast::IP::Endpoint remoteAddress,
OverlayImpl& overlay, Resource::Manager& resourceManager,
PeerFinder::Manager& peerFinder, PeerFinder::Slot::ptr const& slot,
std::shared_ptr<boost::asio::ssl::context> const& context)
: journal_ (deprecatedLogs().journal("Peer"))
: Child (overlay)
, journal_ (deprecatedLogs().journal("Peer"))
, ssl_bundle_(std::make_unique<beast::asio::ssl_bundle>(
context, std::move(socket)))
, socket_ (ssl_bundle_->socket)
@@ -53,7 +54,8 @@ PeerImp::PeerImp (beast::IP::Endpoint remoteAddress,
Resource::Manager& resourceManager, PeerFinder::Manager& peerFinder,
PeerFinder::Slot::ptr const& slot,
std::shared_ptr<boost::asio::ssl::context> const& context)
: journal_ (deprecatedLogs().journal("Peer"))
: Child (overlay)
, journal_ (deprecatedLogs().journal("Peer"))
, ssl_bundle_(std::make_unique<beast::asio::ssl_bundle>(
context, io_service))
, socket_ (ssl_bundle_->socket)

View File

@@ -62,6 +62,7 @@ std::ostream& operator<< (std::ostream& os, PeerImp const* peer);
class PeerImp
: public Peer
, public std::enable_shared_from_this <PeerImp>
, public OverlayImpl::Child
, private beast::LeakChecked <Peer>
, private abstract_protocol_handler
{
@@ -202,15 +203,15 @@ public:
// Begin asynchronous initiation function calls
void
start ();
// Cancel all I/O and close the socket
void
close();
start();
void
getLedger (protocol::TMGetLedger& packet);
// Cancel all I/O and close the socket
void
close() override;
//
// Network
//

View File

@@ -121,11 +121,6 @@ protected:
explicit Manager (Stoppable& parent);
public:
/** Create a new Manager. */
static Manager* New (Stoppable& parent,
beast::File const& pathToDbFileOrDirectory,
clock_type& clock, beast::Journal journal);
/** Destroy the object.
Any pending source fetch operations are aborted.
There may be some listener calls made before the

View File

@@ -24,6 +24,7 @@
#include <ripple/peerfinder/impl/StoreSqdb.h>
#include <boost/asio/io_service.hpp>
#include <boost/optional.hpp>
#include <beast/cxx14/memory.h> // <memory>
#include <thread>
#if DOXYGEN
@@ -38,6 +39,8 @@ class ManagerImp
, public beast::LeakChecked <ManagerImp>
{
public:
boost::asio::io_service &io_service_;
boost::optional <boost::asio::io_service::work> work_;
beast::File m_databaseFile;
clock_type& m_clock;
beast::Journal m_journal;
@@ -45,19 +48,17 @@ public:
Checker<boost::asio::ip::tcp> checker_;
Logic <decltype(checker_)> m_logic;
// Temporary
std::thread thread_;
boost::asio::io_service io_service_;
boost::optional <boost::asio::io_service::work> work_;
//--------------------------------------------------------------------------
ManagerImp (
Stoppable& stoppable,
boost::asio::io_service& io_service,
beast::File const& pathToDbFileOrDirectory,
clock_type& clock,
beast::Journal journal)
: Manager (stoppable)
, io_service_(io_service)
, work_(boost::in_place(std::ref(io_service_)))
, m_databaseFile (pathToDbFileOrDirectory)
, m_clock (clock)
, m_journal (journal)
@@ -67,23 +68,21 @@ public:
{
if (m_databaseFile.isDirectory ())
m_databaseFile = m_databaseFile.getChildFile("peerfinder.sqlite");
work_ = boost::in_place (std::ref(io_service_));
thread_ = std::thread ([&]() { this->io_service_.run(); });
}
~ManagerImp()
{
stop();
close();
}
void
stop()
close()
{
if (thread_.joinable())
if (work_)
{
work_ = boost::none;
thread_.join();
checker_.stop();
m_logic.stop();
}
}
@@ -228,15 +227,8 @@ public:
void onStop ()
{
m_journal.debug << "Stopping";
checker_.stop();
m_logic.stop();
close();
stopped();
/*
signalThreadShouldExit();
m_queue.dispatch (m_context.wrap (
std::bind (&Thread::signalThreadShouldExit, this)));
*/
}
//--------------------------------------------------------------------------
@@ -259,10 +251,12 @@ Manager::Manager (Stoppable& parent)
{
}
Manager* Manager::New (Stoppable& parent, beast::File const& databaseFile,
clock_type& clock, beast::Journal journal)
std::unique_ptr<Manager>
make_Manager (beast::Stoppable& parent, boost::asio::io_service& io_service,
beast::File const& databaseFile, clock_type& clock, beast::Journal journal)
{
return new ManagerImp (parent, databaseFile, clock, journal);
return std::make_unique<ManagerImp> (
parent, io_service, databaseFile, clock, journal);
}
}

View File

@@ -0,0 +1,39 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_PEERFINDER_MAKE_MANAGER_H_INCLUDED
#define RIPPLE_PEERFINDER_MAKE_MANAGER_H_INCLUDED
#include <ripple/peerfinder/Manager.h>
#include <boost/asio/io_service.hpp>
#include <memory>
namespace ripple {
namespace PeerFinder {
/** Create a new Manager. */
std::unique_ptr<Manager>
make_Manager (beast::Stoppable& parent, boost::asio::io_service& io_service,
beast::File const& pathToDbFileOrDirectory,
clock_type& clock, beast::Journal journal);
}
}
#endif