Refactor PeerFinder:

Previously, the PeerFinder manager constructed with a Callback object
provided by the owner which was used to perform operations like connecting,
disconnecting, and sending messages. This made it difficult to change the
overlay code because a single call into the PeerFinder could cause both
OverlayImpl and PeerImp to be re-entered one or more times, sometimes while
holding a recursive mutex. This change eliminates the callback by changing
PeerFinder functions to return values indicating the action the caller should
take.

As a result of this change the PeerFinder no longer needs its own dedicated
thread. OverlayImpl is changed to call into PeerFinder on a timer to perform
periodic activities. Furthermore the Checker class used to perform connectivity
checks has been refactored. It no longer uses an abstract base class, in order
to not type-erase the handler passed to async_connect (ensuring compatibility
with coroutines). To allow unit tests that don't need a network, the Logic
class is now templated on the Checker type. Currently the Manager provides its
own io_service. However, this can easily be changed so that the io_service is
provided upon construction.

Summary
* Remove unused SiteFiles dependency injection
* Remove Callback and update signatures for public APIs
* Remove obsolete functions
* Move timer to overlay
* Steps toward a shared io_service
* Templated, simplified Checker
* Tidy up Checker declaration
This commit is contained in:
Vinnie Falco
2014-10-07 18:00:14 -07:00
parent 5f59282ba1
commit 7c0c2419f7
18 changed files with 816 additions and 1120 deletions

View File

@@ -2837,13 +2837,8 @@
</ClCompile>
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Bootcache.h">
</ClInclude>
<ClCompile Include="..\..\src\ripple\peerfinder\impl\Checker.cpp">
<ExcludedFromBuild>True</ExcludedFromBuild>
</ClCompile>
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Checker.h">
</ClInclude>
<ClInclude Include="..\..\src\ripple\peerfinder\impl\CheckerAdapter.h">
</ClInclude>
<ClCompile Include="..\..\src\ripple\peerfinder\impl\Config.cpp">
<ExcludedFromBuild>True</ExcludedFromBuild>
</ClCompile>

View File

@@ -3984,15 +3984,9 @@
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Bootcache.h">
<Filter>ripple\peerfinder\impl</Filter>
</ClInclude>
<ClCompile Include="..\..\src\ripple\peerfinder\impl\Checker.cpp">
<Filter>ripple\peerfinder\impl</Filter>
</ClCompile>
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Checker.h">
<Filter>ripple\peerfinder\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\peerfinder\impl\CheckerAdapter.h">
<Filter>ripple\peerfinder\impl</Filter>
</ClInclude>
<ClCompile Include="..\..\src\ripple\peerfinder\impl\Config.cpp">
<Filter>ripple\peerfinder\impl</Filter>
</ClCompile>

View File

@@ -20,7 +20,6 @@
#include <ripple/overlay/impl/OverlayImpl.h>
#include <ripple/overlay/impl/PeerDoor.h>
#include <ripple/overlay/impl/PeerImp.h>
#include <beast/ByteOrder.h>
#if DOXYGEN
@@ -63,15 +62,12 @@ OverlayImpl::OverlayImpl (Stoppable& parent,
, m_child_count (1)
, m_journal (deprecatedLogs().journal("Overlay"))
, m_resourceManager (resourceManager)
, m_peerFinder (add (PeerFinder::Manager::New (
*this,
siteFiles,
pathToDbFileOrDirectory,
*this,
get_seconds_clock (),
, m_peerFinder (add (PeerFinder::Manager::New (*this,
pathToDbFileOrDirectory, get_seconds_clock (),
deprecatedLogs().journal("PeerFinder"))))
, m_io_service (io_service)
, m_ssl_context (ssl_context)
, timer_(io_service)
, m_resolver (resolver)
, m_nextShortId (0)
{
@@ -222,89 +218,6 @@ OverlayImpl::remove (PeerFinder::Slot::ptr const& slot)
release();
}
//--------------------------------------------------------------------------
//
// PeerFinder::Callback
//
//--------------------------------------------------------------------------
void
OverlayImpl::connect (std::vector <beast::IP::Endpoint> const& list)
{
for (std::vector <beast::IP::Endpoint>::const_iterator iter (list.begin());
iter != list.end(); ++iter)
connect (*iter);
}
void
OverlayImpl::activate (PeerFinder::Slot::ptr const& slot)
{
m_journal.trace <<
"Activate " << slot->remote_endpoint();
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
PeersBySlot::iterator const iter (m_peers.find (slot));
assert (iter != m_peers.end ());
PeerImp::ptr const peer (iter->second.lock());
assert (peer != nullptr);
peer->activate ();
}
void
OverlayImpl::send (PeerFinder::Slot::ptr const& slot,
std::vector <PeerFinder::Endpoint> const& endpoints)
{
typedef std::vector <PeerFinder::Endpoint> List;
protocol::TMEndpoints tm;
for (List::const_iterator iter (endpoints.begin());
iter != endpoints.end(); ++iter)
{
PeerFinder::Endpoint const& ep (*iter);
protocol::TMEndpoint& tme (*tm.add_endpoints());
if (ep.address.is_v4())
tme.mutable_ipv4()->set_ipv4(
beast::toNetworkByteOrder (ep.address.to_v4().value));
else
tme.mutable_ipv4()->set_ipv4(0);
tme.mutable_ipv4()->set_ipv4port (ep.address.port());
tme.set_hops (ep.hops);
}
tm.set_version (1);
Message::pointer msg (
std::make_shared <Message> (
tm, protocol::mtENDPOINTS));
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
PeersBySlot::iterator const iter (m_peers.find (slot));
assert (iter != m_peers.end ());
PeerImp::ptr const peer (iter->second.lock());
assert (peer != nullptr);
peer->send (msg);
}
}
void
OverlayImpl::disconnect (PeerFinder::Slot::ptr const& slot, bool graceful)
{
if (m_journal.trace) m_journal.trace <<
"Disconnect " << slot->remote_endpoint () <<
(graceful ? " gracefully" : "");
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
PeersBySlot::iterator const iter (m_peers.find (slot));
assert (iter != m_peers.end ());
PeerImp::ptr const peer (iter->second.lock());
assert (peer != nullptr);
peer->close (graceful);
//peer->detach ("disc", false);
}
//--------------------------------------------------------------------------
//
// Stoppable
@@ -405,6 +318,10 @@ OverlayImpl::onPrepare ()
void
OverlayImpl::onStart ()
{
// mutex not needed since we aren't running
++m_child_count;
boost::asio::spawn (m_io_service, std::bind (
&OverlayImpl::do_timer, this, std::placeholders::_1));
}
/** Close all peer connections.
@@ -423,13 +340,16 @@ OverlayImpl::close_all (bool graceful)
// ~PeerImp is pre-empted before it calls m_peers.remove()
//
if (peer != nullptr)
peer->close (graceful);
peer->close();
}
}
void
OverlayImpl::onStop ()
{
error_code ec;
timer_.cancel(ec);
if (m_doorDirect)
m_doorDirect->stop();
if (m_doorProxy)
@@ -467,7 +387,7 @@ OverlayImpl::onWrite (beast::PropertyStream::Map& stream)
are known.
*/
void
OverlayImpl::onPeerActivated (Peer::ptr const& peer)
OverlayImpl::activate (Peer::ptr const& peer)
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
@@ -561,6 +481,56 @@ OverlayImpl::findPeerByShortID (Peer::ShortId const& id)
//------------------------------------------------------------------------------
void
OverlayImpl::autoconnect()
{
auto const result = m_peerFinder->autoconnect();
for (auto addr : result)
connect (addr);
}
void
OverlayImpl::sendpeers()
{
auto const result = m_peerFinder->sendpeers();
for (auto const& e : result)
{
// VFALCO TODO Make sure this doesn't race with closing the peer
PeerImp::ptr peer;
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
PeersBySlot::iterator const iter = m_peers.find (e.first);
if (iter != m_peers.end())
peer = iter->second.lock();
}
if (peer)
peer->send_endpoints (e.second.begin(), e.second.end());
}
}
void
OverlayImpl::do_timer (yield_context yield)
{
for(;;)
{
m_peerFinder->once_per_second();
sendpeers();
autoconnect();
timer_.expires_from_now (std::chrono::seconds(1));
error_code ec;
timer_.async_wait (yield[ec]);
if (ec == boost::asio::error::operation_aborted)
break;
}
// Take off a reference
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
release();
}
//------------------------------------------------------------------------------
std::unique_ptr <Overlay>
make_Overlay (
beast::Stoppable& parent,

View File

@@ -27,13 +27,14 @@
#include <ripple/common/UnorderedContainers.h>
#include <ripple/peerfinder/Manager.h>
#include <ripple/resource/api/Manager.h>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl/context.hpp>
#include <boost/asio/basic_waitable_timer.hpp>
#include <boost/asio/spawn.hpp>
#include <beast/cxx14/memory.h> // <memory>
#include <atomic>
#include <cassert>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <unordered_map>
@@ -43,12 +44,13 @@ namespace ripple {
class PeerDoor;
class PeerImp;
class OverlayImpl
: public Overlay
, public PeerFinder::Callback
class OverlayImpl : public Overlay
{
private:
typedef boost::asio::ip::tcp::socket socket_type;
using clock_type = std::chrono::steady_clock;
using socket_type = boost::asio::ip::tcp::socket;
using error_code = boost::system::error_code;
using yield_context = boost::asio::yield_context;
typedef hash_map <PeerFinder::Slot::ptr,
std::weak_ptr <PeerImp>> PeersBySlot;
@@ -57,6 +59,7 @@ private:
typedef hash_map <Peer::ShortId, Peer::ptr> PeerByShortId;
// VFALCO TODO Change to regular mutex and eliminate re-entrancy
std::recursive_mutex m_mutex;
// Blocks us until dependent objects have been destroyed
@@ -72,6 +75,7 @@ private:
boost::asio::io_service& m_io_service;
boost::asio::ssl::context& m_ssl_context;
boost::asio::basic_waitable_timer <clock_type> timer_;
/** Associates slots to peers. */
PeersBySlot m_peers;
@@ -149,23 +153,6 @@ public:
void
remove (PeerFinder::Slot::ptr const& slot);
//
// PeerFinder::Callback
//
void
connect (std::vector <beast::IP::Endpoint> const& list);
void
activate (PeerFinder::Slot::ptr const& slot);
void
send (PeerFinder::Slot::ptr const& slot,
std::vector <PeerFinder::Endpoint> const& endpoints);
void
disconnect (PeerFinder::Slot::ptr const& slot, bool graceful);
//
// Stoppable
//
@@ -205,7 +192,7 @@ public:
are known.
*/
void
onPeerActivated (Peer::ptr const& peer);
activate (Peer::ptr const& peer);
/** A peer is being disconnected
This is called during the disconnection of a known, activated peer. It
@@ -216,6 +203,14 @@ public:
onPeerDisconnect (Peer::ptr const& peer);
private:
void
sendpeers();
void
autoconnect();
void
do_timer (yield_context yield);
};
} // ripple

View File

@@ -21,6 +21,7 @@
#include <ripple/overlay/impl/PeerImp.h>
#include <ripple/overlay/impl/Tuning.h>
#include <beast/streams/debug_ostream.h>
#include <functional>
namespace ripple {
@@ -86,20 +87,15 @@ PeerImp::start ()
}
void
PeerImp::activate ()
PeerImp::close()
{
assert (state_ == stateHandshaked);
state_ = stateActive;
assert(shortId_ == 0);
shortId_ = overlay_.next_id();
overlay_.onPeerActivated(shared_from_this ());
}
if (! strand_.running_in_this_thread())
return strand_.post (std::bind (
&PeerImp::close, shared_from_this()));
void
PeerImp::close (bool graceful)
{
was_canceled_ = true;
detach ("stop", graceful);
error_code ec;
timer_.cancel (ec);
socket_->close(ec);
}
//------------------------------------------------------------------------------
@@ -309,10 +305,7 @@ void PeerImp::do_connect ()
usage_ = resourceManager_.newOutboundEndpoint (remote_address_);
if (usage_.disconnect ())
{
detach ("do_connect");
return;
}
return detach ("do_connect");
boost::system::error_code ec;
timer_.expires_from_now (nodeVerifySeconds, ec);
@@ -321,8 +314,7 @@ void PeerImp::do_connect ()
if (ec)
{
journal_.error << "Failed to set verify timer.";
detach ("do_connect");
return;
return detach ("do_connect");
}
socket_->next_layer <NativeSocketType>().async_connect (
@@ -347,15 +339,15 @@ PeerImp::on_connect (error_code ec)
journal_.error <<
"Connect to " << remote_address_ <<
" failed: " << ec.message();
detach ("hc");
return;
return detach ("hc");
}
assert (state_ == stateConnecting);
state_ = stateConnected;
peerFinder_.on_connected (slot_,
beast::IPAddressConversion::from_asio (local_endpoint));
if (! peerFinder_.connected (slot_,
beast::IPAddressConversion::from_asio (local_endpoint)))
return detach("dup");
socket_->set_verify_mode (boost::asio::ssl::verify_none);
socket_->async_handshake (
@@ -895,8 +887,6 @@ PeerImp::on_message (std::shared_ptr <protocol::TMHello> const& m)
{
error_code ec;
bool bDetach (true);
timer_.cancel ();
std::uint32_t const ourTime (getApp().getOPs ().getNetworkTimeNC ());
@@ -915,6 +905,8 @@ PeerImp::on_message (std::shared_ptr <protocol::TMHello> const& m)
auto protocol = BuildInfo::make_protocol(m->protoversion());
// VFALCO TODO Report these failures in the HTTP response
if (m->has_nettime () &&
((m->nettime () < minTime) || (m->nettime () > maxTime)))
{
@@ -974,10 +966,18 @@ PeerImp::on_message (std::shared_ptr <protocol::TMHello> const& m)
"Connected to cluster node " << name_;
assert (state_ == stateConnected);
// VFALCO TODO Remove this needless state
state_ = stateHandshaked;
peerFinder_.on_handshake (slot_, RipplePublicKey (publicKey_),
clusterNode_);
auto const result = peerFinder_.activate (slot_,
RipplePublicKey (publicKey_), clusterNode_);
if (result == PeerFinder::Result::success)
{
state_ = stateActive;
assert(shortId_ == 0);
shortId_ = overlay_.next_id();
overlay_.activate(shared_from_this ());
// XXX Set timer: connection is in grace period to be useful.
// XXX Set timer: connection idle (idle may vary depending on connection type.)
@@ -996,24 +996,31 @@ PeerImp::on_message (std::shared_ptr <protocol::TMHello> const& m)
}
else
{
previousLedgerHash_.zero ();
previousLedgerHash_.zero();
}
}
bDetach = false;
sendGetPeers();
return ec;
}
if (bDetach)
if (result == PeerFinder::Result::full)
{
//publicKey_.clear ();
//detach ("recvh");
ec = invalid_argument_error();
// TODO Provide correct HTTP response
auto const redirects = peerFinder_.redirect (slot_);
send_endpoints (redirects.begin(), redirects.end());
}
else
{
sendGetPeers ();
// TODO Duplicate connection
}
}
// VFALCO Commented this out because we return an error code
// to the caller, who calls detach for us.
//publicKey_.clear ();
//detach ("recvh");
ec = invalid_argument_error();
return ec;
}
@@ -2114,9 +2121,6 @@ PeerImp::detach (const char* rsn, bool graceful)
// to have PeerFinder work reliably.
detaching_ = true; // Race is ok.
if (was_canceled_)
peerFinder_.on_cancel (slot_);
else
peerFinder_.on_closed (slot_);
if (state_ == stateActive)

View File

@@ -169,9 +169,6 @@ private:
// The slot assigned to us by PeerFinder
PeerFinder::Slot::ptr slot_;
// True if close was called
bool was_canceled_ = false;
boost::asio::streambuf read_buffer_;
boost::optional <beast::http::message> http_message_;
boost::optional <beast::http::parser> http_parser_;
@@ -211,16 +208,9 @@ public:
void
start ();
/** Indicates that the peer must be activated.
A peer is activated after the handshake is completed and if it is not
a second connection from a peer that we already have. Once activated
the peer transitions to `stateActive` and begins operating.
*/
// Cancel all I/O and close the socket
void
activate ();
/** Close the connection. */
void close (bool graceful);
close();
void
getLedger (protocol::TMGetLedger& packet);
@@ -232,6 +222,13 @@ public:
void
send (Message::pointer const& m) override;
/** Send a set of PeerFinder endpoints as a protocol message. */
template <class FwdIt, class = typename std::enable_if_t<std::is_same<
typename std::iterator_traits<FwdIt>::value_type,
PeerFinder::Endpoint>::value>>
void
send_endpoints (FwdIt first, FwdIt last);
beast::IP::Endpoint
getRemoteAddress() const override;
@@ -246,7 +243,7 @@ public:
getShortId () const override;
RippleAddress const&
getNodePublic () const;
getNodePublic () const override;
Json::Value
json() override;
@@ -512,6 +509,31 @@ private:
//------------------------------------------------------------------------------
template <class FwdIt, class>
void
PeerImp::send_endpoints (FwdIt first, FwdIt last)
{
protocol::TMEndpoints tm;
for (;first != last; ++first)
{
auto const& ep = *first;
protocol::TMEndpoint& tme (*tm.add_endpoints());
if (ep.address.is_v4())
tme.mutable_ipv4()->set_ipv4(
beast::toNetworkByteOrder (ep.address.to_v4().value));
else
tme.mutable_ipv4()->set_ipv4(0);
tme.mutable_ipv4()->set_ipv4port (ep.address.port());
tme.set_hops (ep.hops);
}
tm.set_version (1);
send (std::make_shared <Message> (tm, protocol::mtENDPOINTS));
}
//------------------------------------------------------------------------------
// DEPRECATED
const boost::posix_time::seconds PeerImp::nodeVerifySeconds (15);
//------------------------------------------------------------------------------

View File

@@ -103,31 +103,6 @@ typedef std::vector <Endpoint> Endpoints;
//------------------------------------------------------------------------------
/** The Callback receives PeerFinder notifications.
The notifications are sent on a thread owned by the PeerFinder,
so it is best not to do too much work in here. Just post functor
to another worker thread or job queue and return.
*/
// DEPRECATED Callbacks only cause re-entrancy pain
struct Callback
{
/** Initiate outgoing Peer connections to the specified set of endpoints. */
virtual void connect (IPAddresses const& addresses) = 0;
/** Activate the handshaked peer with the specified address. */
virtual void activate (Slot::ptr const& slot) = 0;
/** Sends a set of Endpoint records to the specified peer. */
virtual void send (Slot::ptr const& slot, Endpoints const& endpoints) = 0;
/** Disconnect the handshaked peer with the specified address.
@param graceful `true` to wait for send buffers to drain before closing.
*/
virtual void disconnect (Slot::ptr const& slot, bool graceful) = 0;
};
//------------------------------------------------------------------------------
/** Possible results from activating a slot. */
enum class Result
{
@@ -146,20 +121,16 @@ protected:
public:
/** Create a new Manager. */
static Manager* New (
Stoppable& parent,
SiteFiles::Manager& siteFiles,
static Manager* New (Stoppable& parent,
beast::File const& pathToDbFileOrDirectory,
Callback& callback,
clock_type& clock,
beast::Journal journal);
clock_type& clock, beast::Journal journal);
/** Destroy the object.
Any pending source fetch operations are aborted.
There may be some listener calls made before the
destructor returns.
*/
virtual ~Manager () { }
virtual ~Manager() = default;
/** Set the configuration for the manager.
The new settings will be applied asynchronously.
@@ -207,19 +178,6 @@ public:
virtual Slot::ptr new_outbound_slot (
beast::IP::Endpoint const& remote_endpoint) = 0;
/** Called when an outbound connection attempt succeeds.
The local endpoint must be valid. If the caller receives an error
when retrieving the local endpoint from the socket, it should
proceed as if the connection attempt failed by calling on_closed
instead of on_connected.
*/
virtual void on_connected (Slot::ptr const& slot,
beast::IP::Endpoint const& local_endpoint) = 0;
/** Called when a handshake is completed. */
virtual void on_handshake (Slot::ptr const& slot,
RipplePublicKey const& key, bool cluster) = 0;
/** Called when mtENDPOINTS is received. */
virtual void on_endpoints (Slot::ptr const& slot,
Endpoints const& endpoints) = 0;
@@ -233,11 +191,46 @@ public:
*/
virtual void on_closed (Slot::ptr const& slot) = 0;
/** Called when the slot is closed via canceling operations.
This is instead of on_closed.
*/
virtual void on_cancel (Slot::ptr const& slot) = 0;
//--------------------------------------------------------------------------
/** Called when an outbound connection attempt succeeds.
The local endpoint must be valid. If the caller receives an error
when retrieving the local endpoint from the socket, it should
proceed as if the connection attempt failed by calling on_closed
instead of on_connected.
@return `true` if the connection should be kept
*/
virtual
bool
connected (Slot::ptr const& slot,
beast::IP::Endpoint const& local_endpoint) = 0;
/** Request an active slot type. */
virtual
Result
activate (Slot::ptr const& slot,
RipplePublicKey const& key, bool cluster) = 0;
/** Returns a set of endpoints suitable for redirection. */
virtual
std::vector <Endpoint>
redirect (Slot::ptr const& slot) = 0;
/** Return a set of addresses we should connect to. */
virtual
std::vector <beast::IP::Endpoint>
autoconnect() = 0;
virtual
std::vector<std::pair<Slot::ptr, std::vector<Endpoint>>>
sendpeers() = 0;
/** Perform periodic activity.
This should be called once per second.
*/
virtual
void
once_per_second() = 0;
};
}

View File

@@ -1,181 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/peerfinder/impl/Checker.h>
#include <beast/asio/IPAddressConversion.h>
#include <beast/asio/placeholders.h>
#include <beast/asio/wrap_handler.h>
#include <beast/utility/LeakChecked.h>
#include <beast/smart_ptr/SharedObject.h>
#include <beast/smart_ptr/SharedPtr.h>
#include <beast/threads/Thread.h>
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/optional.hpp>
namespace ripple {
namespace PeerFinder {
class CheckerImp
: public Checker
, private beast::Thread
, private beast::LeakChecked <CheckerImp>
{
private:
class Request;
struct State
{
beast::List <Request> list;
};
typedef beast::SharedData <State> SharedState;
SharedState m_state;
boost::asio::io_service m_io_service;
boost::optional <boost::asio::io_service::work> m_work;
//--------------------------------------------------------------------------
class Request
: public beast::SharedObject
, public beast::List <Request>::Node
, private beast::LeakChecked <Request>
{
public:
typedef beast::SharedPtr <Request> Ptr;
typedef boost::asio::ip::tcp Protocol;
typedef boost::system::error_code error_code;
typedef Protocol::socket socket_type;
typedef Protocol::endpoint endpoint_type;
CheckerImp& m_owner;
boost::asio::io_service& m_io_service;
beast::IP::Endpoint m_address;
beast::asio::shared_handler <void (Result)> m_handler;
socket_type m_socket;
boost::system::error_code m_error;
bool m_canAccept;
Request (CheckerImp& owner, boost::asio::io_service& io_service,
beast::IP::Endpoint const& address, beast::asio::shared_handler <
void (Result)> const& handler)
: m_owner (owner)
, m_io_service (io_service)
, m_address (address)
, m_handler (handler)
, m_socket (m_io_service)
, m_canAccept (false)
{
m_owner.add (*this);
m_socket.async_connect (beast::IPAddressConversion::to_asio_endpoint (
m_address), beast::asio::wrap_handler (std::bind (
&Request::handle_connect, Ptr(this),
beast::asio::placeholders::error), m_handler));
}
~Request ()
{
Result result;
result.address = m_address;
result.error = m_error;
m_io_service.wrap (m_handler) (result);
m_owner.remove (*this);
}
void cancel ()
{
m_socket.cancel();
}
void handle_connect (boost::system::error_code ec)
{
m_error = ec;
if (ec)
return;
m_canAccept = true;
}
};
//--------------------------------------------------------------------------
void add (Request& request)
{
SharedState::Access state (m_state);
state->list.push_back (request);
}
void remove (Request& request)
{
SharedState::Access state (m_state);
state->list.erase (state->list.iterator_to (request));
}
void run ()
{
m_io_service.run ();
}
public:
CheckerImp ()
: Thread ("PeerFinder::Checker")
, m_work (boost::in_place (std::ref (m_io_service)))
{
startThread ();
}
~CheckerImp ()
{
// cancel pending i/o
cancel();
// destroy the io_service::work object
m_work = boost::none;
// signal and wait for the thread to exit gracefully
stopThread ();
}
void cancel ()
{
SharedState::Access state (m_state);
for (beast::List <Request>::iterator iter (state->list.begin());
iter != state->list.end(); ++iter)
iter->cancel();
}
void async_test (beast::IP::Endpoint const& endpoint,
beast::asio::shared_handler <void (Result)> handler)
{
new Request (*this, m_io_service, endpoint, handler);
}
};
//------------------------------------------------------------------------------
Checker* Checker::New ()
{
return new CheckerImp;
}
}
}

View File

@@ -20,20 +20,79 @@
#ifndef RIPPLE_PEERFINDER_CHECKER_H_INCLUDED
#define RIPPLE_PEERFINDER_CHECKER_H_INCLUDED
#include <beast/asio/shared_handler.h>
#include <beast/asio/IPAddressConversion.h>
#include <beast/asio/placeholders.h>
#include <boost/asio/detail/handler_invoke_helpers.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/intrusive/list.hpp>
#include <boost/system/error_code.hpp>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <utility>
namespace ripple {
namespace PeerFinder {
/** Tests remote listening sockets to make sure they are connectible. */
template <class Protocol = boost::asio::ip::tcp>
class Checker
{
private:
using error_code = boost::system::error_code;
struct basic_async_op : boost::intrusive::list_base_hook <
boost::intrusive::link_mode <boost::intrusive::normal_link>>
{
virtual
~basic_async_op() = default;
virtual
void
stop() = 0;
virtual
void
operator() (error_code const& ec) = 0;
};
template <class Handler>
struct async_op : basic_async_op
{
using socket_type = typename Protocol::socket;
using endpoint_type = typename Protocol::endpoint;
Checker& checker_;
socket_type socket_;
Handler handler_;
async_op (Checker& owner, boost::asio::io_service& io_service,
Handler&& handler);
~async_op();
void
stop() override;
void
operator() (error_code const& ec) override;
};
//--------------------------------------------------------------------------
using list_type = typename boost::intrusive::make_list <basic_async_op,
boost::intrusive::constant_time_size <true>>::type;
std::mutex mutex_;
std::condition_variable cond_;
boost::asio::io_service& io_service_;
list_type list_;
bool stop_ = false;
public:
/** Create the service.
This will automatically start the associated thread and io_service.
*/
static Checker* New ();
explicit
Checker (boost::asio::io_service& io_service);
/** Destroy the service.
Any pending I/O operations will be canceled. This call blocks until
@@ -41,40 +100,134 @@ public:
operation_aborted) and the associated thread and io_service have
no more work remaining.
*/
virtual ~Checker () { }
~Checker();
/** Cancel pending I/O.
/** Stop the service.
Pending I/O operations will be canceled.
This issues cancel orders for all pending I/O operations and then
returns immediately. Handlers will receive operation_aborted errors,
or if they were already queued they will complete normally.
*/
virtual void cancel () = 0;
void
stop();
struct Result
{
Result ()
: canAccept (false)
{ }
/** The original address. */
beast::IP::Endpoint address;
/** The error code from the operation. */
boost::system::error_code error;
/** `true` if the endpoint is reachable, else `false`.
Only defined if no error occurred.
*/
bool canAccept;
};
/** Block until all pending I/O completes. */
void
wait();
/** Performs an async connection test on the specified endpoint.
The port must be non-zero.
The port must be non-zero. Note that the execution guarantees
offered by asio handlers are NOT enforced.
*/
virtual void async_test (beast::IP::Endpoint const& endpoint,
beast::asio::shared_handler <void (Result)> handler) = 0;
template <class Handler>
void
async_connect (beast::IP::Endpoint const& endpoint, Handler&& handler);
private:
void
remove (basic_async_op& op);
};
//------------------------------------------------------------------------------
template <class Protocol>
template <class Handler>
Checker<Protocol>::async_op<Handler>::async_op (Checker& owner,
boost::asio::io_service& io_service, Handler&& handler)
: checker_ (owner)
, socket_ (io_service)
, handler_ (std::forward<Handler>(handler))
{
}
template <class Protocol>
template <class Handler>
Checker<Protocol>::async_op<Handler>::~async_op()
{
checker_.remove (*this);
}
template <class Protocol>
template <class Handler>
void
Checker<Protocol>::async_op<Handler>::stop()
{
error_code ec;
socket_.cancel(ec);
}
template <class Protocol>
template <class Handler>
void
Checker<Protocol>::async_op<Handler>::operator() (
error_code const& ec)
{
handler_(ec);
}
//------------------------------------------------------------------------------
template <class Protocol>
Checker<Protocol>::Checker (boost::asio::io_service& io_service)
: io_service_(io_service)
{
}
template <class Protocol>
Checker<Protocol>::~Checker()
{
wait();
}
template <class Protocol>
void
Checker<Protocol>::stop()
{
std::lock_guard<std::mutex> lock (mutex_);
if (! stop_)
{
stop_ = true;
for (auto& c : list_)
c.stop();
}
}
template <class Protocol>
void
Checker<Protocol>::wait()
{
std::unique_lock<std::mutex> lock (mutex_);
while (! list_.empty())
cond_.wait (lock);
}
template <class Protocol>
template <class Handler>
void
Checker<Protocol>::async_connect (
beast::IP::Endpoint const& endpoint, Handler&& handler)
{
auto const op = std::make_shared<async_op<Handler>> (
*this, io_service_, std::forward<Handler>(handler));
{
std::lock_guard<std::mutex> lock (mutex_);
list_.push_back (*op);
}
op->socket_.async_connect (beast::IPAddressConversion::to_asio_endpoint (
endpoint), std::bind (&basic_async_op::operator(), op,
beast::asio::placeholders::error));
}
template <class Protocol>
void
Checker<Protocol>::remove (basic_async_op& op)
{
std::lock_guard <std::mutex> lock (mutex_);
list_.erase (list_.iterator_to (op));
if (list_.size() == 0)
cond_.notify_all();
}
}
}

View File

@@ -1,105 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_PEERFINDER_CHECKERADAPTER_H_INCLUDED
#define RIPPLE_PEERFINDER_CHECKERADAPTER_H_INCLUDED
#include <beast/threads/ServiceQueue.h>
#include <beast/threads/ScopedWrapperContext.h>
#include <beast/utility/Debug.h>
#include <memory>
namespace ripple {
namespace PeerFinder {
// Ensures that all Logic member function entry points are
// called while holding a lock on the recursive mutex.
//
typedef beast::ScopedWrapperContext <
beast::RecursiveMutex, beast::RecursiveMutex::ScopedLockType> SerializedContext;
/** Adapts a ServiceQueue to dispatch Checker handler completions.
This lets the Logic have its Checker handler get dispatched
on the ServiceQueue instead of an io_service thread. Otherwise,
Logic would need a ServiceQueue to dispatch from its handler.
*/
class CheckerAdapter : public Checker
{
private:
SerializedContext& m_context;
beast::ServiceQueue& m_queue;
std::unique_ptr <Checker> m_checker;
struct Handler
{
SerializedContext& m_context;
beast::ServiceQueue& m_queue;
beast::asio::shared_handler <void (Checker::Result)> m_handler;
Handler (
SerializedContext& context,
beast::ServiceQueue& queue,
beast::asio::shared_handler <void (Checker::Result)> const& handler)
: m_context (context)
, m_queue (queue)
, m_handler (handler)
{ }
void operator() (Checker::Result result)
{
// VFALCO TODO Fix this, it is surely wrong but
// this supposedly correct line doesn't compile
//m_queue.wrap (m_context.wrap (m_handler)) (result);
// WRONG
m_queue.wrap (m_handler) (result);
}
};
public:
CheckerAdapter (SerializedContext& context, beast::ServiceQueue& queue)
: m_context (context)
, m_queue (queue)
, m_checker (Checker::New())
{
}
~CheckerAdapter ()
{
// Have to do this before other fields get destroyed
m_checker = nullptr;
}
void cancel ()
{
m_checker->cancel();
}
void async_test (beast::IP::Endpoint const& endpoint,
beast::asio::shared_handler <void (Checker::Result)> handler)
{
m_checker->async_test (endpoint, Handler (
m_context, m_queue, handler));
}
};
}
}
#endif

View File

@@ -116,6 +116,11 @@ public:
return slot_;
}
std::vector <Endpoint>& list()
{
return list_;
}
std::vector <Endpoint> const& list() const
{
return list_;

View File

@@ -28,9 +28,8 @@
#include <ripple/peerfinder/impl/Reporting.h>
#include <ripple/peerfinder/impl/SlotImp.h>
#include <ripple/peerfinder/impl/Source.h>
#include <beast/container/aged_container_utility.h>
#include <beast/smart_ptr/SharedPtr.h>
#include <functional>
#include <map>
@@ -41,6 +40,7 @@ namespace PeerFinder {
We keep this in a separate class so it can be instantiated
for unit tests.
*/
template <class Checker>
class Logic
{
public:
@@ -113,7 +113,6 @@ public:
beast::Journal m_journal;
SharedState m_state;
clock_type& m_clock;
Callback& m_callback;
Store& m_store;
Checker& m_checker;
@@ -126,29 +125,24 @@ public:
//--------------------------------------------------------------------------
Logic (
clock_type& clock,
Callback& callback,
Store& store,
Checker& checker,
beast::Journal journal)
Logic (clock_type& clock, Store& store,
Checker& checker, beast::Journal journal)
: m_journal (journal, Reporting::logic)
, m_state (&store, std::ref (clock), journal)
, m_clock (clock)
, m_callback (callback)
, m_store (store)
, m_checker (checker)
, m_whenBroadcast (m_clock.now())
, m_squelches (m_clock)
{
setConfig (Config ());
config ({});
}
// Load persistent state information from the Store
//
void load ()
{
SharedState::Access state (m_state);
typename SharedState::Access state (m_state);
state->bootcache.load ();
}
@@ -161,7 +155,7 @@ public:
*/
void stop ()
{
SharedState::Access state (m_state);
typename SharedState::Access state (m_state);
state->stopping = true;
if (state->fetchSource != nullptr)
state->fetchSource->cancel ();
@@ -173,17 +167,19 @@ public:
//
//--------------------------------------------------------------------------
void setConfig (Config const& config)
void
config (Config const& c)
{
SharedState::Access state (m_state);
state->config = config;
typename SharedState::Access state (m_state);
state->config = c;
state->counts.onConfig (state->config);
}
void addFixedPeer (std::string const& name,
void
addFixedPeer (std::string const& name,
std::vector <beast::IP::Endpoint> const& addresses)
{
SharedState::Access state (m_state);
typename SharedState::Access state (m_state);
if (addresses.empty ())
{
@@ -213,12 +209,12 @@ public:
// Called when the Checker completes a connectivity test
void checkComplete (beast::IP::Endpoint const& remoteAddress,
beast::IP::Endpoint const& checkedAddress,
Checker::Result const& result)
boost::system::error_code ec)
{
if (result.error == boost::asio::error::operation_aborted)
if (ec == boost::asio::error::operation_aborted)
return;
SharedState::Access state (m_state);
typename SharedState::Access state (m_state);
Slots::iterator const iter (state->slots.find (remoteAddress));
SlotImp& slot (*iter->second);
@@ -231,38 +227,23 @@ public:
return;
}
// Mark that a check for this slot is finished.
slot.checked = true;
slot.connectivityCheckInProgress = false;
if (! result.error)
{
slot.checked = true;
slot.canAccept = result.canAccept;
if (slot.canAccept)
{
if (m_journal.debug) m_journal.debug << beast::leftw (18) <<
"Logic testing " << checkedAddress << " succeeded";
}
else
{
if (m_journal.info) m_journal.info << beast::leftw (18) <<
"Logic testing " << checkedAddress << " failed";
}
}
else
if (ec)
{
// VFALCO TODO Should we retry depending on the error?
slot.checked = true;
slot.canAccept = false;
if (m_journal.error) m_journal.error << beast::leftw (18) <<
"Logic testing " << iter->first << " with error, " <<
result.error.message();
ec.message();
state->bootcache.on_failure (checkedAddress);
return;
}
if (! slot.canAccept)
state->bootcache.on_failure (checkedAddress);
slot.canAccept = true;
if (m_journal.debug) m_journal.debug << beast::leftw (18) <<
"Logic testing " << checkedAddress << " succeeded";
}
//--------------------------------------------------------------------------
@@ -274,7 +255,7 @@ public:
"Logic accept" << remote_endpoint <<
" on local " << local_endpoint;
SharedState::Access state (m_state);
typename SharedState::Access state (m_state);
// Check for duplicate connection
{
@@ -346,7 +327,7 @@ public:
if (m_journal.debug) m_journal.debug << beast::leftw (18) <<
"Logic connect " << remote_endpoint;
SharedState::Access state (m_state);
typename SharedState::Access state (m_state);
// Check for duplicate connection
if (state->slots.find (remote_endpoint) !=
@@ -378,14 +359,15 @@ public:
return result.first->second;
}
void on_connected (SlotImp::ptr const& slot,
bool
connected (SlotImp::ptr const& slot,
beast::IP::Endpoint const& local_endpoint)
{
if (m_journal.trace) m_journal.trace << beast::leftw (18) <<
"Logic connected" << slot->remote_endpoint () <<
" on local " << local_endpoint;
SharedState::Access state (m_state);
typename SharedState::Access state (m_state);
// The object must exist in our table
assert (state->slots.find (slot->remote_endpoint ()) !=
@@ -403,8 +385,7 @@ public:
if (m_journal.warning) m_journal.warning << beast::leftw (18) <<
"Logic dropping " << slot->remote_endpoint () <<
" as self connect";
m_callback.disconnect (slot, false);
return;
return false;
}
}
@@ -412,16 +393,18 @@ public:
state->counts.remove (*slot);
slot->state (Slot::connected);
state->counts.add (*slot);
return true;
}
void on_handshake (SlotImp::ptr const& slot,
Result
activate (SlotImp::ptr const& slot,
RipplePublicKey const& key, bool cluster)
{
if (m_journal.debug) m_journal.debug << beast::leftw (18) <<
"Logic handshake " << slot->remote_endpoint () <<
" with " << (cluster ? "clustered " : "") << "key " << key;
SharedState::Access state (m_state);
typename SharedState::Access state (m_state);
// The object must exist in our table
assert (state->slots.find (slot->remote_endpoint ()) !=
@@ -432,14 +415,17 @@ public:
// Check for duplicate connection by key
if (state->keys.find (key) != state->keys.end())
{
m_callback.disconnect (slot, true);
return;
}
return Result::duplicate;
// See if we have an open space for this slot
if (state->counts.can_activate (*slot))
if (! state->counts.can_activate (*slot))
{
if (! slot->inbound())
state->bootcache.on_success (
slot->remote_endpoint());
return Result::full;
}
// Set key and cluster right before adding to the map
// otherwise we could assert later when erasing the key.
state->counts.remove (*slot);
@@ -452,7 +438,6 @@ public:
state->keys.insert (key));
// Public key must not already exist
assert (result.second);
(void) result.second;
// Change state and update counts
state->counts.remove (*slot);
@@ -473,27 +458,239 @@ public:
"Logic fixed " << slot->remote_endpoint () << " success";
}
m_callback.activate (slot);
return Result::success;
}
else
/** Return a list of addresses suitable for redirection.
This is a legacy function, redirects should be returned in
the HTTP handshake and not via TMEndpoints.
*/
std::vector <Endpoint>
redirect (SlotImp::ptr const& slot)
{
if (! slot->inbound())
state->bootcache.on_success (
slot->remote_endpoint());
// Maybe give them some addresses to try
if (slot->inbound ())
redirect (slot, state);
m_callback.disconnect (slot, true);
typename SharedState::Access state (m_state);
RedirectHandouts h (slot);
state->livecache.hops.shuffle();
handout (&h, (&h)+1,
state->livecache.hops.begin(),
state->livecache.hops.end());
return std::move(h.list());
}
/** Create new outbound connection attempts as needed.
This implements PeerFinder's "Outbound Connection Strategy"
*/
std::vector <beast::IP::Endpoint>
autoconnect()
{
std::vector <beast::IP::Endpoint> const none;
typename SharedState::Access state (m_state);
// Count how many more outbound attempts to make
//
auto needed (state->counts.attempts_needed ());
if (needed == 0)
return none;
ConnectHandouts h (needed, m_squelches);
// Make sure we don't connect to already-connected entries.
squelch_slots (state);
// 1. Use Fixed if:
// Fixed active count is below fixed count AND
// ( There are eligible fixed addresses to try OR
// Any outbound attempts are in progress)
//
if (state->counts.fixed_active() < state->fixed.size ())
{
get_fixed (needed, h.list(), state);
if (! h.list().empty ())
{
if (m_journal.debug) m_journal.debug << beast::leftw (18) <<
"Logic connect " << h.list().size() << " fixed";
return h.list();
}
if (state->counts.attempts() > 0)
{
if (m_journal.debug) m_journal.debug << beast::leftw (18) <<
"Logic waiting on " <<
state->counts.attempts() << " attempts";
return none;
}
}
// Only proceed if auto connect is enabled and we
// have less than the desired number of outbound slots
//
if (! state->config.autoConnect ||
state->counts.out_active () >= state->counts.out_max ())
return none;
// 2. Use Livecache if:
// There are any entries in the cache OR
// Any outbound attempts are in progress
//
{
state->livecache.hops.shuffle();
handout (&h, (&h)+1,
state->livecache.hops.rbegin(),
state->livecache.hops.rend());
if (! h.list().empty ())
{
if (m_journal.debug) m_journal.debug << beast::leftw (18) <<
"Logic connect " << h.list().size () << " live " <<
((h.list().size () > 1) ? "endpoints" : "endpoint");
return h.list();
}
else if (state->counts.attempts() > 0)
{
if (m_journal.debug) m_journal.debug << beast::leftw (18) <<
"Logic waiting on " <<
state->counts.attempts() << " attempts";
return none;
}
}
/* 3. Bootcache refill
If the Bootcache is empty, try to get addresses from the current
set of Sources and add them into the Bootstrap cache.
Pseudocode:
If ( domainNames.count() > 0 AND (
unusedBootstrapIPs.count() == 0
OR activeNameResolutions.count() > 0) )
ForOneOrMore (DomainName that hasn't been resolved recently)
Contact DomainName and add entries to the unusedBootstrapIPs
return;
*/
// 4. Use Bootcache if:
// There are any entries we haven't tried lately
//
for (auto iter (state->bootcache.begin());
! h.full() && iter != state->bootcache.end(); ++iter)
h.try_insert (*iter);
if (! h.list().empty ())
{
if (m_journal.debug) m_journal.debug << beast::leftw (18) <<
"Logic connect " << h.list().size () << " boot " <<
((h.list().size () > 1) ? "addresses" : "address");
return h.list();
}
// If we get here we are stuck
return none;
}
std::vector<std::pair<Slot::ptr, std::vector<Endpoint>>>
sendpeers()
{
std::vector<std::pair<Slot::ptr, std::vector<Endpoint>>> result;
typename SharedState::Access state (m_state);
clock_type::time_point const now = m_clock.now();
if (m_whenBroadcast <= now)
{
std::vector <SlotHandouts> targets;
{
// build list of active slots
std::vector <SlotImp::ptr> slots;
slots.reserve (state->slots.size());
std::for_each (state->slots.cbegin(), state->slots.cend(),
[&slots](Slots::value_type const& value)
{
if (value.second->state() == Slot::active)
slots.emplace_back (value.second);
});
std::random_shuffle (slots.begin(), slots.end());
// build target vector
targets.reserve (slots.size());
std::for_each (slots.cbegin(), slots.cend(),
[&targets](SlotImp::ptr const& slot)
{
targets.emplace_back (slot);
});
}
/* VFALCO NOTE
This is a temporary measure. Once we know our own IP
address, the correct solution is to put it into the Livecache
at hops 0, and go through the regular handout path. This way
we avoid handing our address out too frequenty, which this code
suffers from.
*/
// Add an entry for ourselves if:
// 1. We want incoming
// 2. We have slots
// 3. We haven't failed the firewalled test
//
if (state->config.wantIncoming &&
state->counts.inboundSlots() > 0)
{
Endpoint ep;
ep.hops = 0;
ep.address = beast::IP::Endpoint (
beast::IP::AddressV4 ()).at_port (
state->config.listeningPort);
for (auto& t : targets)
t.insert (ep);
}
// build sequence of endpoints by hops
state->livecache.hops.shuffle();
handout (targets.begin(), targets.end(),
state->livecache.hops.begin(),
state->livecache.hops.end());
// broadcast
for (auto const& t : targets)
{
SlotImp::ptr const& slot = t.slot();
auto const& list = t.list();
if (m_journal.trace) m_journal.trace << beast::leftw (18) <<
"Logic sending " << slot->remote_endpoint() <<
" with " << list.size() <<
((list.size() == 1) ? " endpoint" : " endpoints");
result.push_back (std::make_pair (slot, list));
}
m_whenBroadcast = now + Tuning::secondsPerMessage;
}
return result;
}
void once_per_second()
{
typename SharedState::Access state (m_state);
// Expire the Livecache
state->livecache.expire ();
// Expire the recent cache in each slot
for (auto const& entry : state->slots)
entry.second->expire();
// Expire the recent attempts table
beast::expire (m_squelches,
Tuning::recentAttemptDuration);
state->bootcache.periodicActivity ();
}
//--------------------------------------------------------------------------
// Validate and clean up the list that we received from the slot.
void preprocess (SlotImp::ptr const& slot, Endpoints& list,
SharedState::Access& state)
typename SharedState::Access& state)
{
bool neighbor (false);
for (auto iter (list.begin()); iter != list.end();)
@@ -570,7 +767,7 @@ public:
" contained " << list.size () <<
((list.size() > 1) ? " entries" : " entry");
SharedState::Access state (m_state);
typename SharedState::Access state (m_state);
// The object must exist in our table
assert (state->slots.find (slot->remote_endpoint ()) !=
@@ -583,10 +780,8 @@ public:
clock_type::time_point const now (m_clock.now());
for (auto iter (list.cbegin()); iter != list.cend(); ++iter)
for (auto const& ep : list)
{
Endpoint const& ep (*iter);
assert (ep.hops != 0);
slot->recent.insert (ep.address, ep.hops);
@@ -611,8 +806,8 @@ public:
// Test the slot's listening port before
// adding it to the livecache for the first time.
//
m_checker.async_test (ep.address, std::bind (
&Logic::checkComplete, this, slot->remote_endpoint (),
m_checker.async_connect (ep.address, std::bind (
&Logic::checkComplete, this, slot->remote_endpoint(),
ep.address, std::placeholders::_1));
// Note that we simply discard the first Endpoint
@@ -644,13 +839,13 @@ public:
void on_legacy_endpoints (IPAddresses const& list)
{
// Ignoring them also seems a valid choice.
SharedState::Access state (m_state);
typename SharedState::Access state (m_state);
for (IPAddresses::const_iterator iter (list.begin());
iter != list.end(); ++iter)
state->bootcache.insert (*iter);
}
void remove (SlotImp::ptr const& slot, SharedState::Access& state)
void remove (SlotImp::ptr const& slot, typename SharedState::Access& state)
{
Slots::iterator const iter (state->slots.find (
slot->remote_endpoint ()));
@@ -681,7 +876,7 @@ public:
void on_closed (SlotImp::ptr const& slot)
{
SharedState::Access state (m_state);
typename SharedState::Access state (m_state);
remove (slot, state);
@@ -728,20 +923,10 @@ public:
}
}
void on_cancel (SlotImp::ptr const& slot)
{
SharedState::Access state (m_state);
remove (slot, state);
if (m_journal.trace) m_journal.trace << beast::leftw (18) <<
"Logic cancel " << slot->remote_endpoint();
}
//--------------------------------------------------------------------------
// Returns `true` if the address matches a fixed slot address
bool fixed (beast::IP::Endpoint const& endpoint, SharedState::Access& state) const
bool fixed (beast::IP::Endpoint const& endpoint, typename SharedState::Access& state) const
{
for (auto const& entry : state->fixed)
if (entry.first == endpoint)
@@ -751,7 +936,7 @@ public:
// Returns `true` if the address matches a fixed slot address
// Note that this does not use the port information in the IP::Endpoint
bool fixed (beast::IP::Address const& address, SharedState::Access& state) const
bool fixed (beast::IP::Address const& address, typename SharedState::Access& state) const
{
for (auto const& entry : state->fixed)
if (entry.first.address () == address)
@@ -768,7 +953,7 @@ public:
/** Adds eligible Fixed addresses for outbound attempts. */
template <class Container>
void get_fixed (std::size_t needed, Container& c,
SharedState::Access& state)
typename SharedState::Access& state)
{
auto const now (m_clock.now());
for (auto iter = state->fixed.begin ();
@@ -791,7 +976,7 @@ public:
//--------------------------------------------------------------------------
// Adds slot addresses to the squelched set
void squelch_slots (SharedState::Access& state)
void squelch_slots (typename SharedState::Access& state)
{
for (auto const& s : state->slots)
{
@@ -802,164 +987,20 @@ public:
}
}
/** Create new outbound connection attempts as needed.
This implements PeerFinder's "Outbound Connection Strategy"
*/
void autoconnect ()
{
SharedState::Access state (m_state);
// Count how many more outbound attempts to make
//
auto needed (state->counts.attempts_needed ());
if (needed == 0)
return;
ConnectHandouts h (needed, m_squelches);
// Make sure we don't connect to already-connected entries.
squelch_slots (state);
// 1. Use Fixed if:
// Fixed active count is below fixed count AND
// ( There are eligible fixed addresses to try OR
// Any outbound attempts are in progress)
//
if (state->counts.fixed_active() < state->fixed.size ())
{
get_fixed (needed, h.list(), state);
if (! h.list().empty ())
{
if (m_journal.debug) m_journal.debug << beast::leftw (18) <<
"Logic connect " << h.list().size() << " fixed";
m_callback.connect (h.list());
return;
}
if (state->counts.attempts() > 0)
{
if (m_journal.debug) m_journal.debug << beast::leftw (18) <<
"Logic waiting on " <<
state->counts.attempts() << " attempts";
return;
}
}
// Only proceed if auto connect is enabled and we
// have less than the desired number of outbound slots
//
if (! state->config.autoConnect ||
state->counts.out_active () >= state->counts.out_max ())
return;
// 2. Use Livecache if:
// There are any entries in the cache OR
// Any outbound attempts are in progress
//
{
state->livecache.hops.shuffle();
handout (&h, (&h)+1,
state->livecache.hops.rbegin(),
state->livecache.hops.rend());
if (! h.list().empty ())
{
if (m_journal.debug) m_journal.debug << beast::leftw (18) <<
"Logic connect " << h.list().size () << " live " <<
((h.list().size () > 1) ? "endpoints" : "endpoint");
m_callback.connect (h.list());
return;
}
else if (state->counts.attempts() > 0)
{
if (m_journal.debug) m_journal.debug << beast::leftw (18) <<
"Logic waiting on " <<
state->counts.attempts() << " attempts";
return;
}
}
/* 3. Bootcache refill
If the Bootcache is empty, try to get addresses from the current
set of Sources and add them into the Bootstrap cache.
Pseudocode:
If ( domainNames.count() > 0 AND (
unusedBootstrapIPs.count() == 0
OR activeNameResolutions.count() > 0) )
ForOneOrMore (DomainName that hasn't been resolved recently)
Contact DomainName and add entries to the unusedBootstrapIPs
return;
*/
// 4. Use Bootcache if:
// There are any entries we haven't tried lately
//
for (auto iter (state->bootcache.begin());
! h.full() && iter != state->bootcache.end(); ++iter)
h.try_insert (*iter);
if (! h.list().empty ())
{
if (m_journal.debug) m_journal.debug << beast::leftw (18) <<
"Logic connect " << h.list().size () << " boot " <<
((h.list().size () > 1) ? "addresses" : "address");
m_callback.connect (h.list());
return;
}
// If we get here we are stuck
}
//--------------------------------------------------------------------------
void addStaticSource (beast::SharedPtr <Source> const& source)
void
addStaticSource (beast::SharedPtr <Source> const& source)
{
fetch (source);
}
void addSource (beast::SharedPtr <Source> const& source)
void
addSource (beast::SharedPtr <Source> const& source)
{
m_sources.push_back (source);
}
//--------------------------------------------------------------------------
// Called periodically to sweep the livecache and remove aged out items.
void expire ()
{
SharedState::Access state (m_state);
// Expire the Livecache
state->livecache.expire ();
// Expire the recent cache in each slot
for (auto const& entry : state->slots)
entry.second->expire();
// Expire the recent attempts table
beast::expire (m_squelches,
Tuning::recentAttemptDuration);
}
// Called every so often to perform periodic tasks.
void periodicActivity ()
{
SharedState::Access state (m_state);
clock_type::time_point const now (m_clock.now());
autoconnect ();
expire ();
state->bootcache.periodicActivity ();
if (m_whenBroadcast <= now)
{
broadcast ();
m_whenBroadcast = now + Tuning::secondsPerMessage;
}
}
//--------------------------------------------------------------------------
//
// Bootcache livecache sources
@@ -970,7 +1011,7 @@ public:
// Returns `true` if the address is new.
//
bool addBootcacheAddress (beast::IP::Endpoint const& address,
SharedState::Access& state)
typename SharedState::Access& state)
{
return state->bootcache.insert (address);
}
@@ -981,7 +1022,7 @@ public:
int addBootcacheAddresses (IPAddresses const& list)
{
int count (0);
SharedState::Access state (m_state);
typename SharedState::Access state (m_state);
for (auto addr : list)
{
if (addBootcacheAddress (addr, state))
@@ -997,7 +1038,7 @@ public:
{
{
SharedState::Access state (m_state);
typename SharedState::Access state (m_state);
if (state->stopping)
return;
state->fetchSource = source;
@@ -1009,7 +1050,7 @@ public:
source->fetch (results, m_journal);
{
SharedState::Access state (m_state);
typename SharedState::Access state (m_state);
if (state->stopping)
return;
state->fetchSource = nullptr;
@@ -1050,103 +1091,6 @@ public:
return true;
}
//--------------------------------------------------------------------------
// Gives a slot a set of addresses to try next since we're full
void redirect (SlotImp::ptr const& slot, SharedState::Access& state)
{
RedirectHandouts h (slot);
state->livecache.hops.shuffle();
handout (&h, (&h)+1,
state->livecache.hops.begin(),
state->livecache.hops.end());
if (! h.list().empty ())
{
if (m_journal.trace) m_journal.trace << beast::leftw (18) <<
"Logic redirect " << slot->remote_endpoint() <<
" with " << h.list().size() <<
((h.list().size() == 1) ? " address" : " addresses");
m_callback.send (slot, h.list());
}
else
{
if (m_journal.warning) m_journal.warning << beast::leftw (18) <<
"Logic deferred " << slot->remote_endpoint();
}
}
// Send mtENDPOINTS for each slot as needed
void broadcast ()
{
SharedState::Access state (m_state);
std::vector <SlotHandouts> targets;
{
// build list of active slots
std::vector <SlotImp::ptr> slots;
slots.reserve (state->slots.size());
std::for_each (state->slots.cbegin(), state->slots.cend(),
[&slots](Slots::value_type const& value)
{
if (value.second->state() == Slot::active)
slots.emplace_back (value.second);
});
std::random_shuffle (slots.begin(), slots.end());
// build target vector
targets.reserve (slots.size());
std::for_each (slots.cbegin(), slots.cend(),
[&targets](SlotImp::ptr const& slot)
{
targets.emplace_back (slot);
});
}
/* VFALCO NOTE
This is a temporary measure. Once we know our own IP
address, the correct solution is to put it into the Livecache
at hops 0, and go through the regular handout path. This way
we avoid handing our address out too frequenty, which this code
suffers from.
*/
// Add an entry for ourselves if:
// 1. We want incoming
// 2. We have slots
// 3. We haven't failed the firewalled test
//
if (state->config.wantIncoming &&
state->counts.inboundSlots() > 0)
{
Endpoint ep;
ep.hops = 0;
ep.address = beast::IP::Endpoint (
beast::IP::AddressV4 ()).at_port (
state->config.listeningPort);
for (auto& t : targets)
t.insert (ep);
}
// build sequence of endpoints by hops
state->livecache.hops.shuffle();
handout (targets.begin(), targets.end(),
state->livecache.hops.begin(),
state->livecache.hops.end());
// broadcast
for (auto const& t : targets)
{
SlotImp::ptr const& slot (t.slot());
auto const& list (t.list());
if (m_journal.trace) m_journal.trace << beast::leftw (18) <<
"Logic sending " << slot->remote_endpoint() <<
" with " << list.size() <<
((list.size() == 1) ? " endpoint" : " endpoints");
m_callback.send (slot, list);
}
}
//--------------------------------------------------------------------------
//
// PropertyStream
@@ -1175,7 +1119,7 @@ public:
void onWrite (beast::PropertyStream::Map& map)
{
SharedState::Access state (m_state);
typename SharedState::Access state (m_state);
// VFALCO NOTE These ugly casts are needed because
// of how std::size_t is declared on some linuxes
@@ -1217,12 +1161,12 @@ public:
State const& state () const
{
return *SharedState::ConstAccess (m_state);
return *typename SharedState::ConstAccess (m_state);
}
Counts const& counts () const
{
return SharedState::ConstAccess (m_state)->counts;
return typename SharedState::ConstAccess (m_state)->counts;
}
static std::string stateString (Slot::State state)
@@ -1245,19 +1189,3 @@ public:
}
#endif
/*
- recent tables entries should last equal to the cache time to live
- never send a slot a message thats in its recent table at a lower hop count
- when sending a message to a slot, add it to its recent table at one lower hop count
Giveaway logic
When do we give away?
- To one inbound connection when we redirect due to full
- To all slots at every broadcast
*/

View File

@@ -18,11 +18,13 @@
//==============================================================================
#include <ripple/peerfinder/Manager.h>
#include <ripple/peerfinder/impl/CheckerAdapter.h>
#include <ripple/peerfinder/impl/Checker.h>
#include <ripple/peerfinder/impl/Logic.h>
#include <ripple/peerfinder/impl/SourceStrings.h>
#include <ripple/peerfinder/impl/StoreSqdb.h>
#include <beast/module/core/thread/DeadlineTimer.h>
#include <boost/asio/io_service.hpp>
#include <boost/optional.hpp>
#include <thread>
#if DOXYGEN
#include <ripple/peerfinder/README.md>
@@ -33,50 +35,56 @@ namespace PeerFinder {
class ManagerImp
: public Manager
, public beast::Thread
, public SiteFiles::Listener
, public beast::DeadlineTimer::Listener
, public beast::LeakChecked <ManagerImp>
{
public:
beast::ServiceQueue m_queue;
SiteFiles::Manager& m_siteFiles;
beast::File m_databaseFile;
clock_type& m_clock;
beast::Journal m_journal;
StoreSqdb m_store;
SerializedContext m_context;
CheckerAdapter m_checker;
Logic m_logic;
beast::DeadlineTimer m_secondsTimer;
Checker<boost::asio::ip::tcp> checker_;
Logic <decltype(checker_)> m_logic;
// Temporary
std::thread thread_;
boost::asio::io_service io_service_;
boost::optional <boost::asio::io_service::work> work_;
//--------------------------------------------------------------------------
ManagerImp (
Stoppable& stoppable,
SiteFiles::Manager& siteFiles,
beast::File const& pathToDbFileOrDirectory,
Callback& callback,
clock_type& clock,
beast::Journal journal)
: Manager (stoppable)
, Thread ("PeerFinder")
, m_siteFiles (siteFiles)
, m_databaseFile (pathToDbFileOrDirectory)
, m_clock (clock)
, m_journal (journal)
, m_store (journal)
, m_checker (m_context, m_queue)
, m_logic (clock, callback, m_store, m_checker, journal)
, m_secondsTimer (this)
, checker_ (io_service_)
, m_logic (clock, m_store, checker_, journal)
{
if (m_databaseFile.isDirectory ())
m_databaseFile = m_databaseFile.getChildFile("peerfinder.sqlite");
work_ = boost::in_place (std::ref(io_service_));
thread_ = std::thread ([&]() { this->io_service_.run(); });
}
~ManagerImp ()
~ManagerImp()
{
stopThread ();
stop();
}
void
stop()
{
if (thread_.joinable())
{
work_ = boost::none;
thread_.join();
}
}
//--------------------------------------------------------------------------
@@ -87,28 +95,20 @@ public:
void setConfig (Config const& config)
{
m_queue.dispatch (
m_context.wrap (
std::bind (&Logic::setConfig, &m_logic,
config)));
m_logic.config (config);
}
void addFixedPeer (std::string const& name,
std::vector <beast::IP::Endpoint> const& addresses)
{
m_queue.dispatch (
m_context.wrap (
std::bind (&Logic::addFixedPeer, &m_logic,
name, addresses)));
m_logic.addFixedPeer (name, addresses);
}
void addFallbackStrings (std::string const& name,
void
addFallbackStrings (std::string const& name,
std::vector <std::string> const& strings)
{
m_queue.dispatch (
m_context.wrap (
std::bind (&Logic::addStaticSource, &m_logic,
SourceStrings::New (name, strings))));
m_logic.addStaticSource (SourceStrings::New (name, strings));
}
void addFallbackURL (std::string const& name, std::string const& url)
@@ -118,107 +118,81 @@ public:
//--------------------------------------------------------------------------
Slot::ptr new_inbound_slot (
Slot::ptr
new_inbound_slot (
beast::IP::Endpoint const& local_endpoint,
beast::IP::Endpoint const& remote_endpoint)
{
return m_logic.new_inbound_slot (local_endpoint, remote_endpoint);
}
Slot::ptr new_outbound_slot (beast::IP::Endpoint const& remote_endpoint)
Slot::ptr
new_outbound_slot (beast::IP::Endpoint const& remote_endpoint)
{
return m_logic.new_outbound_slot (remote_endpoint);
}
void on_connected (Slot::ptr const& slot,
beast::IP::Endpoint const& local_endpoint)
{
SlotImp::ptr impl (std::dynamic_pointer_cast <SlotImp> (slot));
m_logic.on_connected (impl, local_endpoint);
}
void on_handshake (Slot::ptr const& slot,
RipplePublicKey const& key, bool cluster)
{
SlotImp::ptr impl (std::dynamic_pointer_cast <SlotImp> (slot));
m_logic.on_handshake (impl, key, cluster);
}
void on_endpoints (Slot::ptr const& slot,
Endpoints const& endpoints)
void
on_endpoints (Slot::ptr const& slot, Endpoints const& endpoints)
{
SlotImp::ptr impl (std::dynamic_pointer_cast <SlotImp> (slot));
m_logic.on_endpoints (impl, endpoints);
}
void on_legacy_endpoints (IPAddresses const& addresses)
void
on_legacy_endpoints (IPAddresses const& addresses)
{
m_logic.on_legacy_endpoints (addresses);
}
void on_closed (Slot::ptr const& slot)
void
on_closed (Slot::ptr const& slot)
{
SlotImp::ptr impl (std::dynamic_pointer_cast <SlotImp> (slot));
m_logic.on_closed (impl);
}
void on_cancel (Slot::ptr const& slot)
//--------------------------------------------------------------------------
bool
connected (Slot::ptr const& slot,
beast::IP::Endpoint const& local_endpoint) override
{
SlotImp::ptr impl (std::dynamic_pointer_cast <SlotImp> (slot));
m_logic.on_cancel (impl);
return m_logic.connected (impl, local_endpoint);
}
//--------------------------------------------------------------------------
//
// SiteFiles
//
//--------------------------------------------------------------------------
void parseBootstrapIPs (std::string const& name, SiteFiles::Section const& section)
Result
activate (Slot::ptr const& slot,
RipplePublicKey const& key, bool cluster) override
{
std::size_t n (0);
for (SiteFiles::Section::DataType::const_iterator iter (
section.data().begin()); iter != section.data().end(); ++iter)
{
std::string const& s (*iter);
beast::IP::Endpoint addr (beast::IP::Endpoint::from_string (s));
if (is_unspecified (addr))
addr = beast::IP::Endpoint::from_string_altform(s);
if (! is_unspecified (addr))
{
// add IP::Endpoint to bootstrap cache
++n;
}
SlotImp::ptr impl (std::dynamic_pointer_cast <SlotImp> (slot));
return m_logic.activate (impl, key, cluster);
}
m_journal.info <<
"Added " << n <<
" bootstrap IPs from " << name;
std::vector <Endpoint>
redirect (Slot::ptr const& slot) override
{
SlotImp::ptr impl (std::dynamic_pointer_cast <SlotImp> (slot));
return m_logic.redirect (impl);
}
void parseFixedIPs (SiteFiles::Section const& section)
std::vector <beast::IP::Endpoint>
autoconnect() override
{
for (SiteFiles::Section::DataType::const_iterator iter (
section.data().begin()); iter != section.data().end(); ++iter)
{
std::string const& s (*iter);
beast::IP::Endpoint addr (beast::IP::Endpoint::from_string (s));
if (is_unspecified (addr))
addr = beast::IP::Endpoint::from_string_altform(s);
if (! is_unspecified (addr))
{
// add IP::Endpoint to fixed peers
}
}
return m_logic.autoconnect();
}
void onSiteFileFetch (
std::string const& name, SiteFiles::SiteFile const& siteFile)
void
once_per_second() override
{
parseBootstrapIPs (name, siteFile["ips"]);
m_logic.once_per_second();
}
//if (name == "local")
// parseFixedIPs (name, siteFile["ips_fixed"]);
std::vector<std::pair<Slot::ptr, std::vector<Endpoint>>>
sendpeers() override
{
return m_logic.sendpeers();
}
//--------------------------------------------------------------------------
@@ -231,20 +205,28 @@ public:
{
}
void onStart ()
void
onStart()
{
startThread();
m_journal.debug << "Initializing";
beast::Error error (m_store.open (m_databaseFile));
if (error)
m_journal.fatal <<
"Failed to open '" << m_databaseFile.getFullPathName() << "'";
if (! error)
m_logic.load ();
}
void onStop ()
{
m_journal.debug << "Stopping";
m_checker.cancel ();
m_logic.stop ();
m_secondsTimer.cancel();
m_queue.dispatch (
m_context.wrap (
checker_.stop();
m_logic.stop();
/*
signalThreadShouldExit();
m_queue.dispatch (m_context.wrap (
std::bind (&Thread::signalThreadShouldExit, this)));
*/
}
//--------------------------------------------------------------------------
@@ -255,62 +237,8 @@ public:
void onWrite (beast::PropertyStream::Map& map)
{
SerializedContext::Scope scope (m_context);
m_logic.onWrite (map);
}
//--------------------------------------------------------------------------
void onDeadlineTimer (beast::DeadlineTimer& timer)
{
if (timer == m_secondsTimer)
{
m_queue.dispatch (
m_context.wrap (
std::bind (&Logic::periodicActivity, &m_logic)));
m_secondsTimer.setExpiration (Tuning::secondsPerConnect);
}
}
void init ()
{
m_journal.debug << "Initializing";
beast::Error error (m_store.open (m_databaseFile));
if (error)
{
m_journal.fatal <<
"Failed to open '" << m_databaseFile.getFullPathName() << "'";
}
if (! error)
{
m_logic.load ();
}
m_secondsTimer.setExpiration (std::chrono::seconds (1));
}
void run ()
{
m_journal.debug << "Started";
init ();
m_siteFiles.addListener (*this);
while (! this->threadShouldExit())
{
m_queue.run_one();
}
m_siteFiles.removeListener (*this);
stopped();
}
};
//------------------------------------------------------------------------------
@@ -321,16 +249,10 @@ Manager::Manager (Stoppable& parent)
{
}
Manager* Manager::New (
Stoppable& parent,
SiteFiles::Manager& siteFiles,
beast::File const& databaseFile,
Callback& callback,
clock_type& clock,
beast::Journal journal)
Manager* Manager::New (Stoppable& parent, beast::File const& databaseFile,
clock_type& clock, beast::Journal journal)
{
return new ManagerImp (parent, siteFiles, databaseFile,
callback, clock, journal);
return new ManagerImp (parent, databaseFile, clock, journal);
}
}

View File

@@ -20,6 +20,7 @@
#ifndef RIPPLE_PEERFINDER_SOURCE_H_INCLUDED
#define RIPPLE_PEERFINDER_SOURCE_H_INCLUDED
#include <beast/smart_ptr/SharedObject.h>
#include <boost/system/error_code.hpp>
namespace ripple {

View File

@@ -21,6 +21,7 @@
#define RIPPLE_PEERFINDER_STORESQDB_H_INCLUDED
#include <beast/module/sqdb/sqdb.h>
#include <beast/utility/Debug.h>
namespace ripple {
namespace PeerFinder {

View File

@@ -499,7 +499,7 @@ public:
{
}
void async_test (IP::Endpoint const& address,
void async_connect (IP::Endpoint const& address,
asio::shared_handler <void (Result)> handler)
{
Node* const node (m_network.find (address));

View File

@@ -43,7 +43,7 @@ public:
/** Destroy the object.
Any pending fetch operations are aborted.
*/
virtual ~Manager () { }
virtual ~Manager() = default;
/** Adds a listener. */
virtual void addListener (Listener& listener) = 0;

View File

@@ -26,7 +26,6 @@
#endif
#include <ripple/peerfinder/impl/Bootcache.cpp>
#include <ripple/peerfinder/impl/Checker.cpp>
#include <ripple/peerfinder/impl/Config.cpp>
#include <ripple/peerfinder/impl/Endpoint.cpp>
#include <ripple/peerfinder/impl/Livecache.cpp>