diff --git a/Builds/VisualStudio2013/RippleD.vcxproj b/Builds/VisualStudio2013/RippleD.vcxproj
index a267e87dd..e617f09ff 100644
--- a/Builds/VisualStudio2013/RippleD.vcxproj
+++ b/Builds/VisualStudio2013/RippleD.vcxproj
@@ -2697,6 +2697,8 @@
+
+
diff --git a/Builds/VisualStudio2013/RippleD.vcxproj.filters b/Builds/VisualStudio2013/RippleD.vcxproj.filters
index a8f920bf2..c67f11064 100644
--- a/Builds/VisualStudio2013/RippleD.vcxproj.filters
+++ b/Builds/VisualStudio2013/RippleD.vcxproj.filters
@@ -3786,6 +3786,9 @@
ripple\peerfinder\impl
+
+ ripple\peerfinder
+
ripple\peerfinder
diff --git a/src/ripple/overlay/impl/OverlayImpl.cpp b/src/ripple/overlay/impl/OverlayImpl.cpp
index 7f690bc0e..586a3985f 100644
--- a/src/ripple/overlay/impl/OverlayImpl.cpp
+++ b/src/ripple/overlay/impl/OverlayImpl.cpp
@@ -21,6 +21,7 @@
#include
#include
#include
+#include
#include
#if DOXYGEN
@@ -52,6 +53,64 @@ struct get_peer_json
//------------------------------------------------------------------------------
+OverlayImpl::Child::Child (OverlayImpl& overlay)
+ : overlay_(overlay)
+{
+}
+
+OverlayImpl::Child::~Child()
+{
+ overlay_.remove(*this);
+}
+
+//------------------------------------------------------------------------------
+
+OverlayImpl::Timer::Timer (OverlayImpl& overlay)
+ : Child(overlay)
+ , timer_(overlay_.io_service_)
+{
+}
+
+void
+OverlayImpl::Timer::close()
+{
+ error_code ec;
+ timer_.cancel(ec);
+}
+
+void
+OverlayImpl::Timer::run()
+{
+ error_code ec;
+ timer_.expires_from_now (std::chrono::seconds(1));
+ timer_.async_wait(overlay_.strand_.wrap(
+ std::bind(&Timer::on_timer, shared_from_this(),
+ beast::asio::placeholders::error)));
+}
+
+void
+OverlayImpl::Timer::on_timer (error_code ec)
+{
+ if (ec || overlay_.isStopping())
+ {
+ if (ec && ec != boost::asio::error::operation_aborted)
+ if (overlay_.journal_.error) overlay_.journal_.error <<
+ "on_timer: " << ec.message();
+ return;
+ }
+
+ overlay_.m_peerFinder->once_per_second();
+ overlay_.sendpeers();
+ overlay_.autoconnect();
+
+ timer_.expires_from_now (std::chrono::seconds(1));
+ timer_.async_wait(overlay_.strand_.wrap(std::bind(
+ &Timer::on_timer, shared_from_this(),
+ beast::asio::placeholders::error)));
+}
+
+//------------------------------------------------------------------------------
+
OverlayImpl::OverlayImpl (
Setup const& setup,
Stoppable& parent,
@@ -61,28 +120,30 @@ OverlayImpl::OverlayImpl (
Resolver& resolver,
boost::asio::io_service& io_service)
: Overlay (parent)
+ , io_service_ (io_service)
+ , work_ (boost::in_place(std::ref(io_service_)))
+ , strand_ (io_service_)
, setup_(setup)
- , m_child_count (1)
- , m_journal (deprecatedLogs().journal("Overlay"))
+ , journal_ (deprecatedLogs().journal("Overlay"))
, m_resourceManager (resourceManager)
- , m_peerFinder (add (PeerFinder::Manager::New (*this,
- pathToDbFileOrDirectory, get_seconds_clock (),
- deprecatedLogs().journal("PeerFinder"))))
- , m_io_service (io_service)
- , timer_(io_service)
+ , m_peerFinder (PeerFinder::make_Manager (*this, io_service,
+ pathToDbFileOrDirectory, get_seconds_clock(),
+ deprecatedLogs().journal("PeerFinder")))
, m_resolver (resolver)
, m_nextShortId (0)
{
+ beast::PropertyStream::Source::add (m_peerFinder.get());
}
OverlayImpl::~OverlayImpl ()
{
+ close();
+
// Block until dependent objects have been destroyed.
// This is just to catch improper use of the Stoppable API.
//
- std::unique_lock lock (m_mutex);
- m_cond.wait (lock, [this] {
- return this->m_child_count == 0; });
+ std::unique_lock lock (mutex_);
+ cond_.wait (lock, [this] { return list_.empty(); });
}
void
@@ -114,31 +175,25 @@ OverlayImpl::accept (socket_type&& socket)
*m_peerFinder, slot, setup_.context));
{
- std::lock_guard lock (m_mutex);
+ std::lock_guard lock (mutex_);
{
std::pair const result (
m_peers.emplace (slot, peer));
assert (result.second);
(void) result.second;
}
- ++m_child_count;
+ list_.emplace(peer.get(), peer);
// This has to happen while holding the lock,
// otherwise the socket might not be canceled during a stop.
- peer->start ();
+ peer->start();
}
}
void
OverlayImpl::connect (beast::IP::Endpoint const& remote_endpoint)
{
- if (isStopping())
- {
- m_journal.debug <<
- "Skipping " << remote_endpoint <<
- " connect on stop";
- return;
- }
+ assert(work_);
PeerFinder::Slot::ptr const slot (
m_peerFinder->new_outbound_slot (remote_endpoint));
@@ -147,18 +202,18 @@ OverlayImpl::connect (beast::IP::Endpoint const& remote_endpoint)
return;
PeerImp::ptr const peer (std::make_shared (
- remote_endpoint, m_io_service, *this, m_resourceManager,
+ remote_endpoint, io_service_, *this, m_resourceManager,
*m_peerFinder, slot, setup_.context));
{
- std::lock_guard lock (m_mutex);
+ std::lock_guard lock (mutex_);
{
std::pair const result (
m_peers.emplace (slot, peer));
assert (result.second);
(void) result.second;
}
- ++m_child_count;
+ list_.emplace(peer.get(), peer);
// This has to happen while holding the lock,
// otherwise the socket might not be canceled during a stop.
@@ -174,41 +229,13 @@ OverlayImpl::next_id()
//--------------------------------------------------------------------------
-// Check for the stopped condition
-// Caller must hold the mutex
-void
-OverlayImpl::check_stopped ()
-{
- // To be stopped, child Stoppable objects must be stopped
- // and the count of dependent objects must be zero
- if (areChildrenStopped () && m_child_count == 0)
- {
- m_cond.notify_all ();
- m_journal.info <<
- "Stopped.";
- stopped ();
- }
-}
-
-// Decrement the count of dependent objects
-// Caller must hold the mutex
-void
-OverlayImpl::release ()
-{
- if (--m_child_count == 0)
- check_stopped ();
-}
-
void
OverlayImpl::remove (PeerFinder::Slot::ptr const& slot)
{
- std::lock_guard lock (m_mutex);
-
+ std::lock_guard lock (mutex_);
PeersBySlot::iterator const iter (m_peers.find (slot));
assert (iter != m_peers.end ());
m_peers.erase (iter);
-
- release();
}
//--------------------------------------------------------------------------
@@ -290,62 +317,36 @@ OverlayImpl::onPrepare ()
if (! getConfig ().RUN_STANDALONE)
{
m_doorDirect = make_PeerDoor (*this, getConfig ().PEER_IP,
- getConfig ().peerListeningPort, m_io_service);
+ getConfig ().peerListeningPort, io_service_);
}
}
void
OverlayImpl::onStart ()
{
- // mutex not needed since we aren't running
- ++m_child_count;
- boost::asio::spawn (m_io_service, std::bind (
- &OverlayImpl::do_timer, this, std::placeholders::_1));
-}
-
-/** Close all peer connections.
- If `graceful` is true then active
- Requirements:
- Caller must hold the mutex.
-*/
-void
-OverlayImpl::close_all (bool graceful)
-{
- for (auto const& entry : m_peers)
- {
- PeerImp::ptr const peer (entry.second.lock());
-
- // VFALCO The only case where the weak_ptr is expired should be if
- // ~PeerImp is pre-empted before it calls m_peers.remove()
- //
- if (peer != nullptr)
- peer->close();
- }
+ auto const timer = std::make_shared(*this);
+ std::lock_guard lock (mutex_);
+ list_.emplace(timer.get(), timer);
+ timer_ = timer;
+ timer->run();
}
void
OverlayImpl::onStop ()
{
- error_code ec;
- timer_.cancel(ec);
-
if (m_doorDirect)
m_doorDirect->stop();
if (m_doorProxy)
m_doorProxy->stop();
- std::lock_guard lock (m_mutex);
- // Take off the extra count we added in the constructor
- release();
-
- close_all (false);
+ strand_.dispatch(std::bind(&OverlayImpl::close, this));
}
void
OverlayImpl::onChildrenStopped ()
{
- std::lock_guard lock (m_mutex);
- check_stopped ();
+ std::lock_guard lock (mutex_);
+ checkStopped ();
}
//--------------------------------------------------------------------------
@@ -368,7 +369,7 @@ OverlayImpl::onWrite (beast::PropertyStream::Map& stream)
void
OverlayImpl::activate (Peer::ptr const& peer)
{
- std::lock_guard lock (m_mutex);
+ std::lock_guard lock (mutex_);
// Now track this peer
{
@@ -389,7 +390,7 @@ OverlayImpl::activate (Peer::ptr const& peer)
(void) result.second;
}
- m_journal.debug <<
+ journal_.debug <<
"activated " << peer->getRemoteAddress() <<
" (" << peer->getShortId() <<
":" << RipplePublicKey(peer->getNodePublic()) << ")";
@@ -406,7 +407,7 @@ OverlayImpl::activate (Peer::ptr const& peer)
void
OverlayImpl::onPeerDisconnect (Peer::ptr const& peer)
{
- std::lock_guard lock (m_mutex);
+ std::lock_guard lock (mutex_);
m_shortIdMap.erase (peer->getShortId ());
m_publicKeyMap.erase (peer->getNodePublic ());
}
@@ -416,9 +417,9 @@ OverlayImpl::onPeerDisconnect (Peer::ptr const& peer)
and are running the Ripple protocol.
*/
std::size_t
-OverlayImpl::size ()
+OverlayImpl::size()
{
- std::lock_guard lock (m_mutex);
+ std::lock_guard lock (mutex_);
return m_publicKeyMap.size ();
}
@@ -434,7 +435,7 @@ OverlayImpl::getActivePeers ()
{
Overlay::PeerSequence ret;
- std::lock_guard lock (m_mutex);
+ std::lock_guard lock (mutex_);
ret.reserve (m_publicKeyMap.size ());
@@ -450,7 +451,7 @@ OverlayImpl::getActivePeers ()
Peer::ptr
OverlayImpl::findPeerByShortID (Peer::ShortId const& id)
{
- std::lock_guard lock (m_mutex);
+ std::lock_guard lock (mutex_);
PeerByShortId::iterator const iter (
m_shortIdMap.find (id));
if (iter != m_shortIdMap.end ())
@@ -461,11 +462,38 @@ OverlayImpl::findPeerByShortID (Peer::ShortId const& id)
//------------------------------------------------------------------------------
void
-OverlayImpl::autoconnect()
+OverlayImpl::remove (Child& child)
{
- auto const result = m_peerFinder->autoconnect();
- for (auto addr : result)
- connect (addr);
+ std::lock_guard lock(mutex_);
+ list_.erase(&child);
+ if (list_.empty())
+ checkStopped();
+}
+
+void
+OverlayImpl::close()
+{
+ std::lock_guard lock(mutex_);
+ if (work_)
+ {
+ work_ = boost::none;
+ for (auto& _ : list_)
+ {
+ auto const child = _.second.lock();
+ // Happens when the child is about to be destroyed
+ if (child != nullptr)
+ child->close();
+ }
+ }
+}
+
+// Check for the stopped condition
+// Caller must hold the mutex
+void
+OverlayImpl::checkStopped ()
+{
+ if (isStopping() && areChildrenStopped () && list_.empty())
+ stopped();
}
void
@@ -477,7 +505,7 @@ OverlayImpl::sendpeers()
// VFALCO TODO Make sure this doesn't race with closing the peer
PeerImp::ptr peer;
{
- std::lock_guard lock (m_mutex);
+ std::lock_guard lock (mutex_);
PeersBySlot::iterator const iter = m_peers.find (e.first);
if (iter != m_peers.end())
peer = iter->second.lock();
@@ -488,24 +516,11 @@ OverlayImpl::sendpeers()
}
void
-OverlayImpl::do_timer (yield_context yield)
+OverlayImpl::autoconnect()
{
- for(;;)
- {
- m_peerFinder->once_per_second();
- sendpeers();
- autoconnect();
-
- timer_.expires_from_now (std::chrono::seconds(1));
- error_code ec;
- timer_.async_wait (yield[ec]);
- if (ec == boost::asio::error::operation_aborted)
- break;
- }
-
- // Take off a reference
- std::lock_guard lock (m_mutex);
- release();
+ auto const result = m_peerFinder->autoconnect();
+ for (auto addr : result)
+ connect (addr);
}
//------------------------------------------------------------------------------
diff --git a/src/ripple/overlay/impl/OverlayImpl.h b/src/ripple/overlay/impl/OverlayImpl.h
index 4e5968606..adcf8f781 100644
--- a/src/ripple/overlay/impl/OverlayImpl.h
+++ b/src/ripple/overlay/impl/OverlayImpl.h
@@ -31,6 +31,7 @@
#include
#include
#include
+#include
#include //
#include
#include
@@ -46,38 +47,69 @@ class PeerImp;
class OverlayImpl : public Overlay
{
+public:
+ class Child
+ {
+ protected:
+ OverlayImpl& overlay_;
+
+ explicit
+ Child (OverlayImpl& overlay);
+
+ virtual ~Child();
+
+ public:
+ virtual void close() = 0;
+ };
+
private:
using clock_type = std::chrono::steady_clock;
using socket_type = boost::asio::ip::tcp::socket;
using error_code = boost::system::error_code;
using yield_context = boost::asio::yield_context;
- typedef hash_map > PeersBySlot;
+ using PeersBySlot = hash_map >;
- typedef hash_map PeerByPublicKey;
+ using PeerByPublicKey = hash_map;
- typedef hash_map PeerByShortId;
+ using PeerByShortId = hash_map;
+
+ struct Timer
+ : Child
+ , std::enable_shared_from_this
+ {
+ boost::asio::basic_waitable_timer timer_;
+
+ explicit
+ Timer (OverlayImpl& overlay);
+
+ void
+ close() override;
+
+ void
+ run();
+
+ void
+ on_timer (error_code ec);
+ };
+
+ boost::asio::io_service& io_service_;
+ boost::optional work_;
+ boost::asio::io_service::strand strand_;
+
+ std::recursive_mutex mutex_; // VFALCO use std::mutex
+ std::condition_variable_any cond_;
+ std::weak_ptr timer_;
+ boost::container::flat_map<
+ Child*, std::weak_ptr> list_;
Setup setup_;
-
- // VFALCO TODO Change to regular mutex and eliminate re-entrancy
- std::recursive_mutex m_mutex;
-
- // Blocks us until dependent objects have been destroyed
- std::condition_variable_any m_cond;
-
- // Number of dependencies that must be destroyed before we can stop
- std::size_t m_child_count;
-
- beast::Journal m_journal;
+ beast::Journal journal_;
Resource::Manager& m_resourceManager;
std::unique_ptr m_peerFinder;
- boost::asio::io_service& m_io_service;
- boost::asio::basic_waitable_timer timer_;
-
/** Associates slots to peers. */
PeersBySlot m_peers;
@@ -165,32 +197,21 @@ private:
//--------------------------------------------------------------------------
- void
- check_stopped ();
-
//
// Stoppable
//
void
- onPrepare () override;
+ onPrepare() override;
void
- onStart () override;
-
- /** Close all peer connections.
- If `graceful` is true then active
- Requirements:
- Caller must hold the mutex.
- */
- void
- close_all (bool graceful);
+ onStart() override;
void
- onStop () override;
+ onStop() override;
void
- onChildrenStopped () override;
+ onChildrenStopped() override;
//
// PropertyStream
@@ -202,16 +223,19 @@ private:
//--------------------------------------------------------------------------
void
- release();
+ remove (Child& child);
+
+ void
+ close();
+
+ void
+ checkStopped ();
void
sendpeers();
void
autoconnect();
-
- void
- do_timer (yield_context yield);
};
} // ripple
diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp
index d98714638..1da752760 100644
--- a/src/ripple/overlay/impl/PeerImp.cpp
+++ b/src/ripple/overlay/impl/PeerImp.cpp
@@ -30,7 +30,8 @@ PeerImp::PeerImp (socket_type&& socket, beast::IP::Endpoint remoteAddress,
OverlayImpl& overlay, Resource::Manager& resourceManager,
PeerFinder::Manager& peerFinder, PeerFinder::Slot::ptr const& slot,
std::shared_ptr const& context)
- : journal_ (deprecatedLogs().journal("Peer"))
+ : Child (overlay)
+ , journal_ (deprecatedLogs().journal("Peer"))
, ssl_bundle_(std::make_unique(
context, std::move(socket)))
, socket_ (ssl_bundle_->socket)
@@ -53,7 +54,8 @@ PeerImp::PeerImp (beast::IP::Endpoint remoteAddress,
Resource::Manager& resourceManager, PeerFinder::Manager& peerFinder,
PeerFinder::Slot::ptr const& slot,
std::shared_ptr const& context)
- : journal_ (deprecatedLogs().journal("Peer"))
+ : Child (overlay)
+ , journal_ (deprecatedLogs().journal("Peer"))
, ssl_bundle_(std::make_unique(
context, io_service))
, socket_ (ssl_bundle_->socket)
diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h
index 63864faea..dd08df9bf 100644
--- a/src/ripple/overlay/impl/PeerImp.h
+++ b/src/ripple/overlay/impl/PeerImp.h
@@ -62,6 +62,7 @@ std::ostream& operator<< (std::ostream& os, PeerImp const* peer);
class PeerImp
: public Peer
, public std::enable_shared_from_this
+ , public OverlayImpl::Child
, private beast::LeakChecked
, private abstract_protocol_handler
{
@@ -202,15 +203,15 @@ public:
// Begin asynchronous initiation function calls
void
- start ();
-
- // Cancel all I/O and close the socket
- void
- close();
+ start();
void
getLedger (protocol::TMGetLedger& packet);
+ // Cancel all I/O and close the socket
+ void
+ close() override;
+
//
// Network
//
diff --git a/src/ripple/peerfinder/Manager.h b/src/ripple/peerfinder/Manager.h
index f900e0fd7..50a8de7db 100644
--- a/src/ripple/peerfinder/Manager.h
+++ b/src/ripple/peerfinder/Manager.h
@@ -121,11 +121,6 @@ protected:
explicit Manager (Stoppable& parent);
public:
- /** Create a new Manager. */
- static Manager* New (Stoppable& parent,
- beast::File const& pathToDbFileOrDirectory,
- clock_type& clock, beast::Journal journal);
-
/** Destroy the object.
Any pending source fetch operations are aborted.
There may be some listener calls made before the
diff --git a/src/ripple/peerfinder/impl/Manager.cpp b/src/ripple/peerfinder/impl/Manager.cpp
index 23c2a695e..a6ddb8509 100644
--- a/src/ripple/peerfinder/impl/Manager.cpp
+++ b/src/ripple/peerfinder/impl/Manager.cpp
@@ -24,6 +24,7 @@
#include
#include
#include
+#include //
#include
#if DOXYGEN
@@ -38,6 +39,8 @@ class ManagerImp
, public beast::LeakChecked
{
public:
+ boost::asio::io_service &io_service_;
+ boost::optional work_;
beast::File m_databaseFile;
clock_type& m_clock;
beast::Journal m_journal;
@@ -45,19 +48,17 @@ public:
Checker checker_;
Logic m_logic;
- // Temporary
- std::thread thread_;
- boost::asio::io_service io_service_;
- boost::optional work_;
-
//--------------------------------------------------------------------------
ManagerImp (
Stoppable& stoppable,
+ boost::asio::io_service& io_service,
beast::File const& pathToDbFileOrDirectory,
clock_type& clock,
beast::Journal journal)
: Manager (stoppable)
+ , io_service_(io_service)
+ , work_(boost::in_place(std::ref(io_service_)))
, m_databaseFile (pathToDbFileOrDirectory)
, m_clock (clock)
, m_journal (journal)
@@ -67,23 +68,21 @@ public:
{
if (m_databaseFile.isDirectory ())
m_databaseFile = m_databaseFile.getChildFile("peerfinder.sqlite");
-
- work_ = boost::in_place (std::ref(io_service_));
- thread_ = std::thread ([&]() { this->io_service_.run(); });
}
~ManagerImp()
{
- stop();
+ close();
}
void
- stop()
+ close()
{
- if (thread_.joinable())
+ if (work_)
{
work_ = boost::none;
- thread_.join();
+ checker_.stop();
+ m_logic.stop();
}
}
@@ -228,15 +227,8 @@ public:
void onStop ()
{
- m_journal.debug << "Stopping";
- checker_.stop();
- m_logic.stop();
+ close();
stopped();
- /*
- signalThreadShouldExit();
- m_queue.dispatch (m_context.wrap (
- std::bind (&Thread::signalThreadShouldExit, this)));
- */
}
//--------------------------------------------------------------------------
@@ -259,10 +251,12 @@ Manager::Manager (Stoppable& parent)
{
}
-Manager* Manager::New (Stoppable& parent, beast::File const& databaseFile,
- clock_type& clock, beast::Journal journal)
+std::unique_ptr
+make_Manager (beast::Stoppable& parent, boost::asio::io_service& io_service,
+ beast::File const& databaseFile, clock_type& clock, beast::Journal journal)
{
- return new ManagerImp (parent, databaseFile, clock, journal);
+ return std::make_unique (
+ parent, io_service, databaseFile, clock, journal);
}
}
diff --git a/src/ripple/peerfinder/make_Manager.h b/src/ripple/peerfinder/make_Manager.h
new file mode 100644
index 000000000..aaf6d507f
--- /dev/null
+++ b/src/ripple/peerfinder/make_Manager.h
@@ -0,0 +1,39 @@
+//------------------------------------------------------------------------------
+/*
+ This file is part of rippled: https://github.com/ripple/rippled
+ Copyright (c) 2012, 2013 Ripple Labs Inc.
+
+ Permission to use, copy, modify, and/or distribute this software for any
+ purpose with or without fee is hereby granted, provided that the above
+ copyright notice and this permission notice appear in all copies.
+
+ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+*/
+//==============================================================================
+
+#ifndef RIPPLE_PEERFINDER_MAKE_MANAGER_H_INCLUDED
+#define RIPPLE_PEERFINDER_MAKE_MANAGER_H_INCLUDED
+
+#include
+#include
+#include
+
+namespace ripple {
+namespace PeerFinder {
+
+/** Create a new Manager. */
+std::unique_ptr
+make_Manager (beast::Stoppable& parent, boost::asio::io_service& io_service,
+ beast::File const& pathToDbFileOrDirectory,
+ clock_type& clock, beast::Journal journal);
+
+}
+}
+
+#endif