diff --git a/Builds/VisualStudio2013/RippleD.vcxproj b/Builds/VisualStudio2013/RippleD.vcxproj
index a64f40de29..0145acd94f 100644
--- a/Builds/VisualStudio2013/RippleD.vcxproj
+++ b/Builds/VisualStudio2013/RippleD.vcxproj
@@ -2837,13 +2837,8 @@
-
- True
-
-
-
True
diff --git a/Builds/VisualStudio2013/RippleD.vcxproj.filters b/Builds/VisualStudio2013/RippleD.vcxproj.filters
index 00138232b0..73e1283677 100644
--- a/Builds/VisualStudio2013/RippleD.vcxproj.filters
+++ b/Builds/VisualStudio2013/RippleD.vcxproj.filters
@@ -3984,15 +3984,9 @@
ripple\peerfinder\impl
-
- ripple\peerfinder\impl
-
ripple\peerfinder\impl
-
- ripple\peerfinder\impl
-
ripple\peerfinder\impl
diff --git a/src/ripple/overlay/impl/OverlayImpl.cpp b/src/ripple/overlay/impl/OverlayImpl.cpp
index d8bc765322..1feb958d24 100644
--- a/src/ripple/overlay/impl/OverlayImpl.cpp
+++ b/src/ripple/overlay/impl/OverlayImpl.cpp
@@ -20,7 +20,6 @@
#include
#include
#include
-
#include
#if DOXYGEN
@@ -63,15 +62,12 @@ OverlayImpl::OverlayImpl (Stoppable& parent,
, m_child_count (1)
, m_journal (deprecatedLogs().journal("Overlay"))
, m_resourceManager (resourceManager)
- , m_peerFinder (add (PeerFinder::Manager::New (
- *this,
- siteFiles,
- pathToDbFileOrDirectory,
- *this,
- get_seconds_clock (),
- deprecatedLogs().journal("PeerFinder"))))
+ , m_peerFinder (add (PeerFinder::Manager::New (*this,
+ pathToDbFileOrDirectory, get_seconds_clock (),
+ deprecatedLogs().journal("PeerFinder"))))
, m_io_service (io_service)
, m_ssl_context (ssl_context)
+ , timer_(io_service)
, m_resolver (resolver)
, m_nextShortId (0)
{
@@ -222,89 +218,6 @@ OverlayImpl::remove (PeerFinder::Slot::ptr const& slot)
release();
}
-//--------------------------------------------------------------------------
-//
-// PeerFinder::Callback
-//
-//--------------------------------------------------------------------------
-
-void
-OverlayImpl::connect (std::vector const& list)
-{
- for (std::vector ::const_iterator iter (list.begin());
- iter != list.end(); ++iter)
- connect (*iter);
-}
-
-void
-OverlayImpl::activate (PeerFinder::Slot::ptr const& slot)
-{
- m_journal.trace <<
- "Activate " << slot->remote_endpoint();
-
- std::lock_guard lock (m_mutex);
-
- PeersBySlot::iterator const iter (m_peers.find (slot));
- assert (iter != m_peers.end ());
- PeerImp::ptr const peer (iter->second.lock());
- assert (peer != nullptr);
- peer->activate ();
-}
-
-void
-OverlayImpl::send (PeerFinder::Slot::ptr const& slot,
- std::vector const& endpoints)
-{
- typedef std::vector List;
- protocol::TMEndpoints tm;
- for (List::const_iterator iter (endpoints.begin());
- iter != endpoints.end(); ++iter)
- {
- PeerFinder::Endpoint const& ep (*iter);
- protocol::TMEndpoint& tme (*tm.add_endpoints());
- if (ep.address.is_v4())
- tme.mutable_ipv4()->set_ipv4(
- beast::toNetworkByteOrder (ep.address.to_v4().value));
- else
- tme.mutable_ipv4()->set_ipv4(0);
- tme.mutable_ipv4()->set_ipv4port (ep.address.port());
-
- tme.set_hops (ep.hops);
- }
-
- tm.set_version (1);
-
- Message::pointer msg (
- std::make_shared (
- tm, protocol::mtENDPOINTS));
-
- {
- std::lock_guard lock (m_mutex);
- PeersBySlot::iterator const iter (m_peers.find (slot));
- assert (iter != m_peers.end ());
- PeerImp::ptr const peer (iter->second.lock());
- assert (peer != nullptr);
- peer->send (msg);
- }
-}
-
-void
-OverlayImpl::disconnect (PeerFinder::Slot::ptr const& slot, bool graceful)
-{
- if (m_journal.trace) m_journal.trace <<
- "Disconnect " << slot->remote_endpoint () <<
- (graceful ? " gracefully" : "");
-
- std::lock_guard lock (m_mutex);
-
- PeersBySlot::iterator const iter (m_peers.find (slot));
- assert (iter != m_peers.end ());
- PeerImp::ptr const peer (iter->second.lock());
- assert (peer != nullptr);
- peer->close (graceful);
- //peer->detach ("disc", false);
-}
-
//--------------------------------------------------------------------------
//
// Stoppable
@@ -405,6 +318,10 @@ OverlayImpl::onPrepare ()
void
OverlayImpl::onStart ()
{
+ // mutex not needed since we aren't running
+ ++m_child_count;
+ boost::asio::spawn (m_io_service, std::bind (
+ &OverlayImpl::do_timer, this, std::placeholders::_1));
}
/** Close all peer connections.
@@ -423,13 +340,16 @@ OverlayImpl::close_all (bool graceful)
// ~PeerImp is pre-empted before it calls m_peers.remove()
//
if (peer != nullptr)
- peer->close (graceful);
+ peer->close();
}
}
void
OverlayImpl::onStop ()
{
+ error_code ec;
+ timer_.cancel(ec);
+
if (m_doorDirect)
m_doorDirect->stop();
if (m_doorProxy)
@@ -467,7 +387,7 @@ OverlayImpl::onWrite (beast::PropertyStream::Map& stream)
are known.
*/
void
-OverlayImpl::onPeerActivated (Peer::ptr const& peer)
+OverlayImpl::activate (Peer::ptr const& peer)
{
std::lock_guard lock (m_mutex);
@@ -561,6 +481,56 @@ OverlayImpl::findPeerByShortID (Peer::ShortId const& id)
//------------------------------------------------------------------------------
+void
+OverlayImpl::autoconnect()
+{
+ auto const result = m_peerFinder->autoconnect();
+ for (auto addr : result)
+ connect (addr);
+}
+
+void
+OverlayImpl::sendpeers()
+{
+ auto const result = m_peerFinder->sendpeers();
+ for (auto const& e : result)
+ {
+ // VFALCO TODO Make sure this doesn't race with closing the peer
+ PeerImp::ptr peer;
+ {
+ std::lock_guard lock (m_mutex);
+ PeersBySlot::iterator const iter = m_peers.find (e.first);
+ if (iter != m_peers.end())
+ peer = iter->second.lock();
+ }
+ if (peer)
+ peer->send_endpoints (e.second.begin(), e.second.end());
+ }
+}
+
+void
+OverlayImpl::do_timer (yield_context yield)
+{
+ for(;;)
+ {
+ m_peerFinder->once_per_second();
+ sendpeers();
+ autoconnect();
+
+ timer_.expires_from_now (std::chrono::seconds(1));
+ error_code ec;
+ timer_.async_wait (yield[ec]);
+ if (ec == boost::asio::error::operation_aborted)
+ break;
+ }
+
+ // Take off a reference
+ std::lock_guard lock (m_mutex);
+ release();
+}
+
+//------------------------------------------------------------------------------
+
std::unique_ptr
make_Overlay (
beast::Stoppable& parent,
diff --git a/src/ripple/overlay/impl/OverlayImpl.h b/src/ripple/overlay/impl/OverlayImpl.h
index ce9e3b0227..b60a898215 100644
--- a/src/ripple/overlay/impl/OverlayImpl.h
+++ b/src/ripple/overlay/impl/OverlayImpl.h
@@ -27,13 +27,14 @@
#include
#include
#include
-
#include
#include
-
+#include
+#include
#include //
#include
#include
+#include
#include
#include
#include
@@ -43,12 +44,13 @@ namespace ripple {
class PeerDoor;
class PeerImp;
-class OverlayImpl
- : public Overlay
- , public PeerFinder::Callback
+class OverlayImpl : public Overlay
{
private:
- typedef boost::asio::ip::tcp::socket socket_type;
+ using clock_type = std::chrono::steady_clock;
+ using socket_type = boost::asio::ip::tcp::socket;
+ using error_code = boost::system::error_code;
+ using yield_context = boost::asio::yield_context;
typedef hash_map > PeersBySlot;
@@ -57,6 +59,7 @@ private:
typedef hash_map PeerByShortId;
+ // VFALCO TODO Change to regular mutex and eliminate re-entrancy
std::recursive_mutex m_mutex;
// Blocks us until dependent objects have been destroyed
@@ -72,6 +75,7 @@ private:
boost::asio::io_service& m_io_service;
boost::asio::ssl::context& m_ssl_context;
+ boost::asio::basic_waitable_timer timer_;
/** Associates slots to peers. */
PeersBySlot m_peers;
@@ -149,23 +153,6 @@ public:
void
remove (PeerFinder::Slot::ptr const& slot);
- //
- // PeerFinder::Callback
- //
-
- void
- connect (std::vector const& list);
-
- void
- activate (PeerFinder::Slot::ptr const& slot);
-
- void
- send (PeerFinder::Slot::ptr const& slot,
- std::vector const& endpoints);
-
- void
- disconnect (PeerFinder::Slot::ptr const& slot, bool graceful);
-
//
// Stoppable
//
@@ -205,7 +192,7 @@ public:
are known.
*/
void
- onPeerActivated (Peer::ptr const& peer);
+ activate (Peer::ptr const& peer);
/** A peer is being disconnected
This is called during the disconnection of a known, activated peer. It
@@ -216,6 +203,14 @@ public:
onPeerDisconnect (Peer::ptr const& peer);
private:
+ void
+ sendpeers();
+
+ void
+ autoconnect();
+
+ void
+ do_timer (yield_context yield);
};
} // ripple
diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp
index 5a00cca4f9..3e37395b78 100644
--- a/src/ripple/overlay/impl/PeerImp.cpp
+++ b/src/ripple/overlay/impl/PeerImp.cpp
@@ -21,6 +21,7 @@
#include
#include
#include
+#include
namespace ripple {
@@ -86,20 +87,15 @@ PeerImp::start ()
}
void
-PeerImp::activate ()
+PeerImp::close()
{
- assert (state_ == stateHandshaked);
- state_ = stateActive;
- assert(shortId_ == 0);
- shortId_ = overlay_.next_id();
- overlay_.onPeerActivated(shared_from_this ());
-}
+ if (! strand_.running_in_this_thread())
+ return strand_.post (std::bind (
+ &PeerImp::close, shared_from_this()));
-void
-PeerImp::close (bool graceful)
-{
- was_canceled_ = true;
- detach ("stop", graceful);
+ error_code ec;
+ timer_.cancel (ec);
+ socket_->close(ec);
}
//------------------------------------------------------------------------------
@@ -309,10 +305,7 @@ void PeerImp::do_connect ()
usage_ = resourceManager_.newOutboundEndpoint (remote_address_);
if (usage_.disconnect ())
- {
- detach ("do_connect");
- return;
- }
+ return detach ("do_connect");
boost::system::error_code ec;
timer_.expires_from_now (nodeVerifySeconds, ec);
@@ -321,8 +314,7 @@ void PeerImp::do_connect ()
if (ec)
{
journal_.error << "Failed to set verify timer.";
- detach ("do_connect");
- return;
+ return detach ("do_connect");
}
socket_->next_layer ().async_connect (
@@ -347,15 +339,15 @@ PeerImp::on_connect (error_code ec)
journal_.error <<
"Connect to " << remote_address_ <<
" failed: " << ec.message();
- detach ("hc");
- return;
+ return detach ("hc");
}
assert (state_ == stateConnecting);
state_ = stateConnected;
- peerFinder_.on_connected (slot_,
- beast::IPAddressConversion::from_asio (local_endpoint));
+ if (! peerFinder_.connected (slot_,
+ beast::IPAddressConversion::from_asio (local_endpoint)))
+ return detach("dup");
socket_->set_verify_mode (boost::asio::ssl::verify_none);
socket_->async_handshake (
@@ -895,8 +887,6 @@ PeerImp::on_message (std::shared_ptr const& m)
{
error_code ec;
- bool bDetach (true);
-
timer_.cancel ();
std::uint32_t const ourTime (getApp().getOPs ().getNetworkTimeNC ());
@@ -915,6 +905,8 @@ PeerImp::on_message (std::shared_ptr const& m)
auto protocol = BuildInfo::make_protocol(m->protoversion());
+ // VFALCO TODO Report these failures in the HTTP response
+
if (m->has_nettime () &&
((m->nettime () < minTime) || (m->nettime () > maxTime)))
{
@@ -974,46 +966,61 @@ PeerImp::on_message (std::shared_ptr const& m)
"Connected to cluster node " << name_;
assert (state_ == stateConnected);
+ // VFALCO TODO Remove this needless state
state_ = stateHandshaked;
- peerFinder_.on_handshake (slot_, RipplePublicKey (publicKey_),
- clusterNode_);
+ auto const result = peerFinder_.activate (slot_,
+ RipplePublicKey (publicKey_), clusterNode_);
- // XXX Set timer: connection is in grace period to be useful.
- // XXX Set timer: connection idle (idle may vary depending on connection type.)
- if ((hello_.has_ledgerclosed ()) && (
- hello_.ledgerclosed ().size () == (256 / 8)))
+ if (result == PeerFinder::Result::success)
{
- memcpy (closedLedgerHash_.begin (),
- hello_.ledgerclosed ().data (), 256 / 8);
+ state_ = stateActive;
+ assert(shortId_ == 0);
+ shortId_ = overlay_.next_id();
+ overlay_.activate(shared_from_this ());
- if ((hello_.has_ledgerprevious ()) &&
- (hello_.ledgerprevious ().size () == (256 / 8)))
+ // XXX Set timer: connection is in grace period to be useful.
+ // XXX Set timer: connection idle (idle may vary depending on connection type.)
+ if ((hello_.has_ledgerclosed ()) && (
+ hello_.ledgerclosed ().size () == (256 / 8)))
{
- memcpy (previousLedgerHash_.begin (),
- hello_.ledgerprevious ().data (), 256 / 8);
- addLedger (previousLedgerHash_);
- }
- else
- {
- previousLedgerHash_.zero ();
+ memcpy (closedLedgerHash_.begin (),
+ hello_.ledgerclosed ().data (), 256 / 8);
+
+ if ((hello_.has_ledgerprevious ()) &&
+ (hello_.ledgerprevious ().size () == (256 / 8)))
+ {
+ memcpy (previousLedgerHash_.begin (),
+ hello_.ledgerprevious ().data (), 256 / 8);
+ addLedger (previousLedgerHash_);
+ }
+ else
+ {
+ previousLedgerHash_.zero();
+ }
}
+
+ sendGetPeers();
+ return ec;
}
- bDetach = false;
+ if (result == PeerFinder::Result::full)
+ {
+ // TODO Provide correct HTTP response
+ auto const redirects = peerFinder_.redirect (slot_);
+ send_endpoints (redirects.begin(), redirects.end());
+ }
+ else
+ {
+ // TODO Duplicate connection
+ }
}
- if (bDetach)
- {
- //publicKey_.clear ();
- //detach ("recvh");
-
- ec = invalid_argument_error();
- }
- else
- {
- sendGetPeers ();
- }
+ // VFALCO Commented this out because we return an error code
+ // to the caller, who calls detach for us.
+ //publicKey_.clear ();
+ //detach ("recvh");
+ ec = invalid_argument_error();
return ec;
}
@@ -2114,10 +2121,7 @@ PeerImp::detach (const char* rsn, bool graceful)
// to have PeerFinder work reliably.
detaching_ = true; // Race is ok.
- if (was_canceled_)
- peerFinder_.on_cancel (slot_);
- else
- peerFinder_.on_closed (slot_);
+ peerFinder_.on_closed (slot_);
if (state_ == stateActive)
overlay_.onPeerDisconnect (shared_from_this ());
diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h
index 5883d74425..a274f2be80 100644
--- a/src/ripple/overlay/impl/PeerImp.h
+++ b/src/ripple/overlay/impl/PeerImp.h
@@ -169,9 +169,6 @@ private:
// The slot assigned to us by PeerFinder
PeerFinder::Slot::ptr slot_;
- // True if close was called
- bool was_canceled_ = false;
-
boost::asio::streambuf read_buffer_;
boost::optional http_message_;
boost::optional http_parser_;
@@ -211,16 +208,9 @@ public:
void
start ();
- /** Indicates that the peer must be activated.
- A peer is activated after the handshake is completed and if it is not
- a second connection from a peer that we already have. Once activated
- the peer transitions to `stateActive` and begins operating.
- */
+ // Cancel all I/O and close the socket
void
- activate ();
-
- /** Close the connection. */
- void close (bool graceful);
+ close();
void
getLedger (protocol::TMGetLedger& packet);
@@ -232,6 +222,13 @@ public:
void
send (Message::pointer const& m) override;
+ /** Send a set of PeerFinder endpoints as a protocol message. */
+ template ::value_type,
+ PeerFinder::Endpoint>::value>>
+ void
+ send_endpoints (FwdIt first, FwdIt last);
+
beast::IP::Endpoint
getRemoteAddress() const override;
@@ -246,7 +243,7 @@ public:
getShortId () const override;
RippleAddress const&
- getNodePublic () const;
+ getNodePublic () const override;
Json::Value
json() override;
@@ -512,6 +509,31 @@ private:
//------------------------------------------------------------------------------
+template
+void
+PeerImp::send_endpoints (FwdIt first, FwdIt last)
+{
+ protocol::TMEndpoints tm;
+ for (;first != last; ++first)
+ {
+ auto const& ep = *first;
+ protocol::TMEndpoint& tme (*tm.add_endpoints());
+ if (ep.address.is_v4())
+ tme.mutable_ipv4()->set_ipv4(
+ beast::toNetworkByteOrder (ep.address.to_v4().value));
+ else
+ tme.mutable_ipv4()->set_ipv4(0);
+ tme.mutable_ipv4()->set_ipv4port (ep.address.port());
+ tme.set_hops (ep.hops);
+ }
+ tm.set_version (1);
+
+ send (std::make_shared (tm, protocol::mtENDPOINTS));
+}
+
+//------------------------------------------------------------------------------
+
+// DEPRECATED
const boost::posix_time::seconds PeerImp::nodeVerifySeconds (15);
//------------------------------------------------------------------------------
diff --git a/src/ripple/peerfinder/Manager.h b/src/ripple/peerfinder/Manager.h
index 5d72c5f5be..50d5cfb496 100644
--- a/src/ripple/peerfinder/Manager.h
+++ b/src/ripple/peerfinder/Manager.h
@@ -103,31 +103,6 @@ typedef std::vector Endpoints;
//------------------------------------------------------------------------------
-/** The Callback receives PeerFinder notifications.
- The notifications are sent on a thread owned by the PeerFinder,
- so it is best not to do too much work in here. Just post functor
- to another worker thread or job queue and return.
-*/
-// DEPRECATED Callbacks only cause re-entrancy pain
-struct Callback
-{
- /** Initiate outgoing Peer connections to the specified set of endpoints. */
- virtual void connect (IPAddresses const& addresses) = 0;
-
- /** Activate the handshaked peer with the specified address. */
- virtual void activate (Slot::ptr const& slot) = 0;
-
- /** Sends a set of Endpoint records to the specified peer. */
- virtual void send (Slot::ptr const& slot, Endpoints const& endpoints) = 0;
-
- /** Disconnect the handshaked peer with the specified address.
- @param graceful `true` to wait for send buffers to drain before closing.
- */
- virtual void disconnect (Slot::ptr const& slot, bool graceful) = 0;
-};
-
-//------------------------------------------------------------------------------
-
/** Possible results from activating a slot. */
enum class Result
{
@@ -146,20 +121,16 @@ protected:
public:
/** Create a new Manager. */
- static Manager* New (
- Stoppable& parent,
- SiteFiles::Manager& siteFiles,
+ static Manager* New (Stoppable& parent,
beast::File const& pathToDbFileOrDirectory,
- Callback& callback,
- clock_type& clock,
- beast::Journal journal);
+ clock_type& clock, beast::Journal journal);
/** Destroy the object.
Any pending source fetch operations are aborted.
There may be some listener calls made before the
destructor returns.
*/
- virtual ~Manager () { }
+ virtual ~Manager() = default;
/** Set the configuration for the manager.
The new settings will be applied asynchronously.
@@ -207,19 +178,6 @@ public:
virtual Slot::ptr new_outbound_slot (
beast::IP::Endpoint const& remote_endpoint) = 0;
- /** Called when an outbound connection attempt succeeds.
- The local endpoint must be valid. If the caller receives an error
- when retrieving the local endpoint from the socket, it should
- proceed as if the connection attempt failed by calling on_closed
- instead of on_connected.
- */
- virtual void on_connected (Slot::ptr const& slot,
- beast::IP::Endpoint const& local_endpoint) = 0;
-
- /** Called when a handshake is completed. */
- virtual void on_handshake (Slot::ptr const& slot,
- RipplePublicKey const& key, bool cluster) = 0;
-
/** Called when mtENDPOINTS is received. */
virtual void on_endpoints (Slot::ptr const& slot,
Endpoints const& endpoints) = 0;
@@ -233,11 +191,46 @@ public:
*/
virtual void on_closed (Slot::ptr const& slot) = 0;
- /** Called when the slot is closed via canceling operations.
- This is instead of on_closed.
- */
- virtual void on_cancel (Slot::ptr const& slot) = 0;
+ //--------------------------------------------------------------------------
+ /** Called when an outbound connection attempt succeeds.
+ The local endpoint must be valid. If the caller receives an error
+ when retrieving the local endpoint from the socket, it should
+ proceed as if the connection attempt failed by calling on_closed
+ instead of on_connected.
+ @return `true` if the connection should be kept
+ */
+ virtual
+ bool
+ connected (Slot::ptr const& slot,
+ beast::IP::Endpoint const& local_endpoint) = 0;
+
+ /** Request an active slot type. */
+ virtual
+ Result
+ activate (Slot::ptr const& slot,
+ RipplePublicKey const& key, bool cluster) = 0;
+
+ /** Returns a set of endpoints suitable for redirection. */
+ virtual
+ std::vector
+ redirect (Slot::ptr const& slot) = 0;
+
+ /** Return a set of addresses we should connect to. */
+ virtual
+ std::vector
+ autoconnect() = 0;
+
+ virtual
+ std::vector>>
+ sendpeers() = 0;
+
+ /** Perform periodic activity.
+ This should be called once per second.
+ */
+ virtual
+ void
+ once_per_second() = 0;
};
}
diff --git a/src/ripple/peerfinder/impl/Checker.cpp b/src/ripple/peerfinder/impl/Checker.cpp
deleted file mode 100644
index 3f0e7ed73a..0000000000
--- a/src/ripple/peerfinder/impl/Checker.cpp
+++ /dev/null
@@ -1,181 +0,0 @@
-//------------------------------------------------------------------------------
-/*
- This file is part of rippled: https://github.com/ripple/rippled
- Copyright (c) 2012, 2013 Ripple Labs Inc.
-
- Permission to use, copy, modify, and/or distribute this software for any
- purpose with or without fee is hereby granted, provided that the above
- copyright notice and this permission notice appear in all copies.
-
- THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
- WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
- MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
- ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
- ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
- OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-*/
-//==============================================================================
-
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-
-namespace ripple {
-namespace PeerFinder {
-
-class CheckerImp
- : public Checker
- , private beast::Thread
- , private beast::LeakChecked
-{
-private:
- class Request;
-
- struct State
- {
- beast::List list;
- };
-
- typedef beast::SharedData SharedState;
-
- SharedState m_state;
- boost::asio::io_service m_io_service;
- boost::optional m_work;
-
- //--------------------------------------------------------------------------
-
- class Request
- : public beast::SharedObject
- , public beast::List ::Node
- , private beast::LeakChecked
- {
- public:
- typedef beast::SharedPtr Ptr;
- typedef boost::asio::ip::tcp Protocol;
- typedef boost::system::error_code error_code;
- typedef Protocol::socket socket_type;
- typedef Protocol::endpoint endpoint_type;
-
- CheckerImp& m_owner;
- boost::asio::io_service& m_io_service;
- beast::IP::Endpoint m_address;
- beast::asio::shared_handler m_handler;
- socket_type m_socket;
- boost::system::error_code m_error;
- bool m_canAccept;
-
- Request (CheckerImp& owner, boost::asio::io_service& io_service,
- beast::IP::Endpoint const& address, beast::asio::shared_handler <
- void (Result)> const& handler)
- : m_owner (owner)
- , m_io_service (io_service)
- , m_address (address)
- , m_handler (handler)
- , m_socket (m_io_service)
- , m_canAccept (false)
- {
- m_owner.add (*this);
-
- m_socket.async_connect (beast::IPAddressConversion::to_asio_endpoint (
- m_address), beast::asio::wrap_handler (std::bind (
- &Request::handle_connect, Ptr(this),
- beast::asio::placeholders::error), m_handler));
- }
-
- ~Request ()
- {
- Result result;
- result.address = m_address;
- result.error = m_error;
- m_io_service.wrap (m_handler) (result);
-
- m_owner.remove (*this);
- }
-
- void cancel ()
- {
- m_socket.cancel();
- }
-
- void handle_connect (boost::system::error_code ec)
- {
- m_error = ec;
- if (ec)
- return;
-
- m_canAccept = true;
- }
- };
-
- //--------------------------------------------------------------------------
-
- void add (Request& request)
- {
- SharedState::Access state (m_state);
- state->list.push_back (request);
- }
-
- void remove (Request& request)
- {
- SharedState::Access state (m_state);
- state->list.erase (state->list.iterator_to (request));
- }
-
- void run ()
- {
- m_io_service.run ();
- }
-
-public:
- CheckerImp ()
- : Thread ("PeerFinder::Checker")
- , m_work (boost::in_place (std::ref (m_io_service)))
- {
- startThread ();
- }
-
- ~CheckerImp ()
- {
- // cancel pending i/o
- cancel();
-
- // destroy the io_service::work object
- m_work = boost::none;
-
- // signal and wait for the thread to exit gracefully
- stopThread ();
- }
-
- void cancel ()
- {
- SharedState::Access state (m_state);
- for (beast::List ::iterator iter (state->list.begin());
- iter != state->list.end(); ++iter)
- iter->cancel();
- }
-
- void async_test (beast::IP::Endpoint const& endpoint,
- beast::asio::shared_handler handler)
- {
- new Request (*this, m_io_service, endpoint, handler);
- }
-};
-
-//------------------------------------------------------------------------------
-
-Checker* Checker::New ()
-{
- return new CheckerImp;
-}
-
-}
-}
diff --git a/src/ripple/peerfinder/impl/Checker.h b/src/ripple/peerfinder/impl/Checker.h
index 707324a4e1..dae5898bdb 100644
--- a/src/ripple/peerfinder/impl/Checker.h
+++ b/src/ripple/peerfinder/impl/Checker.h
@@ -20,20 +20,79 @@
#ifndef RIPPLE_PEERFINDER_CHECKER_H_INCLUDED
#define RIPPLE_PEERFINDER_CHECKER_H_INCLUDED
-#include
+#include
+#include
+#include
+#include
+#include
+#include
#include
+#include
+#include
+#include
+#include
namespace ripple {
namespace PeerFinder {
/** Tests remote listening sockets to make sure they are connectible. */
+ template
class Checker
{
+private:
+ using error_code = boost::system::error_code;
+
+ struct basic_async_op : boost::intrusive::list_base_hook <
+ boost::intrusive::link_mode >
+ {
+ virtual
+ ~basic_async_op() = default;
+
+ virtual
+ void
+ stop() = 0;
+
+ virtual
+ void
+ operator() (error_code const& ec) = 0;
+ };
+
+ template
+ struct async_op : basic_async_op
+ {
+ using socket_type = typename Protocol::socket;
+ using endpoint_type = typename Protocol::endpoint;
+
+ Checker& checker_;
+ socket_type socket_;
+ Handler handler_;
+
+ async_op (Checker& owner, boost::asio::io_service& io_service,
+ Handler&& handler);
+
+ ~async_op();
+
+ void
+ stop() override;
+
+ void
+ operator() (error_code const& ec) override;
+ };
+
+ //--------------------------------------------------------------------------
+
+ using list_type = typename boost::intrusive::make_list >::type;
+
+ std::mutex mutex_;
+ std::condition_variable cond_;
+ boost::asio::io_service& io_service_;
+ list_type list_;
+ bool stop_ = false;
+
public:
- /** Create the service.
- This will automatically start the associated thread and io_service.
- */
- static Checker* New ();
+ explicit
+ Checker (boost::asio::io_service& io_service);
/** Destroy the service.
Any pending I/O operations will be canceled. This call blocks until
@@ -41,40 +100,134 @@ public:
operation_aborted) and the associated thread and io_service have
no more work remaining.
*/
- virtual ~Checker () { }
+ ~Checker();
- /** Cancel pending I/O.
+ /** Stop the service.
+ Pending I/O operations will be canceled.
This issues cancel orders for all pending I/O operations and then
returns immediately. Handlers will receive operation_aborted errors,
or if they were already queued they will complete normally.
*/
- virtual void cancel () = 0;
+ void
+ stop();
- struct Result
- {
- Result ()
- : canAccept (false)
- { }
-
- /** The original address. */
- beast::IP::Endpoint address;
-
- /** The error code from the operation. */
- boost::system::error_code error;
-
- /** `true` if the endpoint is reachable, else `false`.
- Only defined if no error occurred.
- */
- bool canAccept;
- };
+ /** Block until all pending I/O completes. */
+ void
+ wait();
/** Performs an async connection test on the specified endpoint.
- The port must be non-zero.
+ The port must be non-zero. Note that the execution guarantees
+ offered by asio handlers are NOT enforced.
*/
- virtual void async_test (beast::IP::Endpoint const& endpoint,
- beast::asio::shared_handler handler) = 0;
+ template
+ void
+ async_connect (beast::IP::Endpoint const& endpoint, Handler&& handler);
+
+private:
+ void
+ remove (basic_async_op& op);
};
+//------------------------------------------------------------------------------
+
+template
+template
+Checker::async_op::async_op (Checker& owner,
+ boost::asio::io_service& io_service, Handler&& handler)
+ : checker_ (owner)
+ , socket_ (io_service)
+ , handler_ (std::forward(handler))
+{
+}
+
+template
+template
+Checker::async_op::~async_op()
+{
+ checker_.remove (*this);
+}
+
+template
+template
+void
+Checker::async_op::stop()
+{
+ error_code ec;
+ socket_.cancel(ec);
+}
+
+template
+template
+void
+Checker::async_op::operator() (
+ error_code const& ec)
+{
+ handler_(ec);
+}
+
+//------------------------------------------------------------------------------
+
+template
+Checker::Checker (boost::asio::io_service& io_service)
+ : io_service_(io_service)
+{
+}
+
+template
+Checker::~Checker()
+{
+ wait();
+}
+
+template
+void
+Checker::stop()
+{
+ std::lock_guard lock (mutex_);
+ if (! stop_)
+ {
+ stop_ = true;
+ for (auto& c : list_)
+ c.stop();
+ }
+}
+
+template
+void
+Checker::wait()
+{
+ std::unique_lock lock (mutex_);
+ while (! list_.empty())
+ cond_.wait (lock);
+}
+
+template
+template
+void
+Checker::async_connect (
+ beast::IP::Endpoint const& endpoint, Handler&& handler)
+{
+ auto const op = std::make_shared> (
+ *this, io_service_, std::forward(handler));
+ {
+ std::lock_guard lock (mutex_);
+ list_.push_back (*op);
+ }
+ op->socket_.async_connect (beast::IPAddressConversion::to_asio_endpoint (
+ endpoint), std::bind (&basic_async_op::operator(), op,
+ beast::asio::placeholders::error));
+}
+
+template
+void
+Checker::remove (basic_async_op& op)
+{
+ std::lock_guard lock (mutex_);
+ list_.erase (list_.iterator_to (op));
+ if (list_.size() == 0)
+ cond_.notify_all();
+}
+
}
}
diff --git a/src/ripple/peerfinder/impl/CheckerAdapter.h b/src/ripple/peerfinder/impl/CheckerAdapter.h
deleted file mode 100644
index f056b7e1d3..0000000000
--- a/src/ripple/peerfinder/impl/CheckerAdapter.h
+++ /dev/null
@@ -1,105 +0,0 @@
-//------------------------------------------------------------------------------
-/*
- This file is part of rippled: https://github.com/ripple/rippled
- Copyright (c) 2012, 2013 Ripple Labs Inc.
-
- Permission to use, copy, modify, and/or distribute this software for any
- purpose with or without fee is hereby granted, provided that the above
- copyright notice and this permission notice appear in all copies.
-
- THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
- WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
- MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
- ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
- ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
- OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-*/
-//==============================================================================
-
-#ifndef RIPPLE_PEERFINDER_CHECKERADAPTER_H_INCLUDED
-#define RIPPLE_PEERFINDER_CHECKERADAPTER_H_INCLUDED
-
-#include
-#include
-#include
-#include
-
-namespace ripple {
-namespace PeerFinder {
-
-// Ensures that all Logic member function entry points are
-// called while holding a lock on the recursive mutex.
-//
-typedef beast::ScopedWrapperContext <
- beast::RecursiveMutex, beast::RecursiveMutex::ScopedLockType> SerializedContext;
-
-/** Adapts a ServiceQueue to dispatch Checker handler completions.
- This lets the Logic have its Checker handler get dispatched
- on the ServiceQueue instead of an io_service thread. Otherwise,
- Logic would need a ServiceQueue to dispatch from its handler.
-*/
-class CheckerAdapter : public Checker
-{
-private:
- SerializedContext& m_context;
- beast::ServiceQueue& m_queue;
- std::unique_ptr m_checker;
-
- struct Handler
- {
- SerializedContext& m_context;
- beast::ServiceQueue& m_queue;
- beast::asio::shared_handler m_handler;
-
- Handler (
- SerializedContext& context,
- beast::ServiceQueue& queue,
- beast::asio::shared_handler const& handler)
- : m_context (context)
- , m_queue (queue)
- , m_handler (handler)
- { }
-
- void operator() (Checker::Result result)
- {
- // VFALCO TODO Fix this, it is surely wrong but
- // this supposedly correct line doesn't compile
- //m_queue.wrap (m_context.wrap (m_handler)) (result);
-
- // WRONG
- m_queue.wrap (m_handler) (result);
- }
- };
-
-public:
- CheckerAdapter (SerializedContext& context, beast::ServiceQueue& queue)
- : m_context (context)
- , m_queue (queue)
- , m_checker (Checker::New())
- {
- }
-
- ~CheckerAdapter ()
- {
- // Have to do this before other fields get destroyed
- m_checker = nullptr;
- }
-
- void cancel ()
- {
- m_checker->cancel();
- }
-
- void async_test (beast::IP::Endpoint const& endpoint,
- beast::asio::shared_handler handler)
- {
- m_checker->async_test (endpoint, Handler (
- m_context, m_queue, handler));
- }
-};
-
-}
-}
-
-#endif
diff --git a/src/ripple/peerfinder/impl/Handouts.h b/src/ripple/peerfinder/impl/Handouts.h
index 132d3ff5d6..5747d5a76f 100644
--- a/src/ripple/peerfinder/impl/Handouts.h
+++ b/src/ripple/peerfinder/impl/Handouts.h
@@ -116,6 +116,11 @@ public:
return slot_;
}
+ std::vector & list()
+ {
+ return list_;
+ }
+
std::vector const& list() const
{
return list_;
diff --git a/src/ripple/peerfinder/impl/Logic.h b/src/ripple/peerfinder/impl/Logic.h
index 57cde0e07b..a9b35f4e3e 100644
--- a/src/ripple/peerfinder/impl/Logic.h
+++ b/src/ripple/peerfinder/impl/Logic.h
@@ -28,9 +28,8 @@
#include
#include
#include
-
#include
-
+#include
#include
#include