Files
xahaud/src/ripple/overlay/impl/OverlayImpl.cpp
Vinnie Falco 856fd9d69f Add [overlay] configuration section (experimental):
This configuration section uses the new BasicConfig interface that supports
key-value pairs in the section. Some exposition is added to the example cfg
file. The new settings for overlay are related to the Hub and Spoke feature
which is currently in development. Production servers should not set
these configuration options, they are clearly marked experimental in the
example cfg file.

Conflicts:
	src/ripple/overlay/impl/OverlayImpl.cpp
	src/ripple/overlay/impl/OverlayImpl.h
	src/ripple/overlay/impl/PeerImp.cpp
	src/ripple/overlay/impl/PeerImp.h
2014-10-23 12:56:16 -07:00

543 lines
15 KiB
C++

//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/overlay/impl/OverlayImpl.h>
#include <ripple/overlay/impl/PeerDoor.h>
#include <ripple/overlay/impl/PeerImp.h>
#include <beast/ByteOrder.h>
#if DOXYGEN
#include <ripple/overlay/README.md>
#endif
namespace ripple {
/** A functor to visit all active peers and retrieve their JSON data */
struct get_peer_json
{
typedef Json::Value return_type;
Json::Value json;
get_peer_json ()
{ }
void operator() (Peer::ptr const& peer)
{
json.append (peer->json ());
}
Json::Value operator() ()
{
return json;
}
};
//------------------------------------------------------------------------------
OverlayImpl::OverlayImpl (Stoppable& parent,
Resource::Manager& resourceManager,
SiteFiles::Manager& siteFiles,
beast::File const& pathToDbFileOrDirectory,
Resolver& resolver,
boost::asio::io_service& io_service,
boost::asio::ssl::context& ssl_context)
: Overlay (parent)
, m_child_count (1)
, m_journal (deprecatedLogs().journal("Overlay"))
, m_resourceManager (resourceManager)
, m_peerFinder (add (PeerFinder::Manager::New (*this,
pathToDbFileOrDirectory, get_seconds_clock (),
deprecatedLogs().journal("PeerFinder"))))
, m_io_service (io_service)
, m_ssl_context (ssl_context)
, timer_(io_service)
, m_resolver (resolver)
, m_nextShortId (0)
{
auto const& section = getConfig()["overlay"];
set (setup_.use_handshake, "use_handshake", section);
set (setup_.auto_connect, "auto_connect", section);
std::string promote;
set (promote, "become_superpeer", section);
if (promote == "never")
setup_.promote = Promote::never;
else if (promote == "always")
setup_.promote = Promote::always;
else
setup_.promote = Promote::automatic;
}
OverlayImpl::~OverlayImpl ()
{
// Block until dependent objects have been destroyed.
// This is just to catch improper use of the Stoppable API.
//
std::unique_lock <decltype(m_mutex)> lock (m_mutex);
m_cond.wait (lock, [this] {
return this->m_child_count == 0; });
}
OverlayImpl::Setup const&
OverlayImpl::setup() const
{
return setup_;
}
void
OverlayImpl::accept (socket_type&& socket)
{
// An error getting an endpoint means the connection closed.
// Just do nothing and the socket will be closed by the caller.
boost::system::error_code ec;
auto const local_endpoint_native (socket.local_endpoint (ec));
if (ec)
return;
auto const remote_endpoint_native (socket.remote_endpoint (ec));
if (ec)
return;
auto const local_endpoint (
beast::IPAddressConversion::from_asio (local_endpoint_native));
auto const remote_endpoint (
beast::IPAddressConversion::from_asio (remote_endpoint_native));
PeerFinder::Slot::ptr const slot (m_peerFinder->new_inbound_slot (
local_endpoint, remote_endpoint));
if (slot == nullptr)
return;
PeerImp::ptr const peer (std::make_shared <PeerImp> (
std::move (socket), remote_endpoint, *this, m_resourceManager,
*m_peerFinder, slot, m_ssl_context));
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
{
std::pair <PeersBySlot::iterator, bool> const result (
m_peers.emplace (slot, peer));
assert (result.second);
(void) result.second;
}
++m_child_count;
// This has to happen while holding the lock,
// otherwise the socket might not be canceled during a stop.
peer->start ();
}
}
void
OverlayImpl::connect (beast::IP::Endpoint const& remote_endpoint)
{
if (isStopping())
{
m_journal.debug <<
"Skipping " << remote_endpoint <<
" connect on stop";
return;
}
PeerFinder::Slot::ptr const slot (
m_peerFinder->new_outbound_slot (remote_endpoint));
if (slot == nullptr)
return;
PeerImp::ptr const peer (std::make_shared <PeerImp> (
remote_endpoint, m_io_service, *this, m_resourceManager,
*m_peerFinder, slot, m_ssl_context));
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
{
std::pair <PeersBySlot::iterator, bool> const result (
m_peers.emplace (slot, peer));
assert (result.second);
(void) result.second;
}
++m_child_count;
// This has to happen while holding the lock,
// otherwise the socket might not be canceled during a stop.
peer->start ();
}
}
Peer::ShortId
OverlayImpl::next_id()
{
return ++m_nextShortId;
}
//--------------------------------------------------------------------------
// Check for the stopped condition
// Caller must hold the mutex
void
OverlayImpl::check_stopped ()
{
// To be stopped, child Stoppable objects must be stopped
// and the count of dependent objects must be zero
if (areChildrenStopped () && m_child_count == 0)
{
m_cond.notify_all ();
m_journal.info <<
"Stopped.";
stopped ();
}
}
// Decrement the count of dependent objects
// Caller must hold the mutex
void
OverlayImpl::release ()
{
if (--m_child_count == 0)
check_stopped ();
}
void
OverlayImpl::remove (PeerFinder::Slot::ptr const& slot)
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
PeersBySlot::iterator const iter (m_peers.find (slot));
assert (iter != m_peers.end ());
m_peers.erase (iter);
release();
}
//--------------------------------------------------------------------------
//
// Stoppable
//
//--------------------------------------------------------------------------
void
OverlayImpl::onPrepare ()
{
PeerFinder::Config config;
if (getConfig ().PEERS_MAX != 0)
config.maxPeers = getConfig ().PEERS_MAX;
config.outPeers = config.calcOutPeers();
config.wantIncoming =
(! getConfig ().PEER_PRIVATE) &&
(getConfig().peerListeningPort != 0);
// if it's a private peer or we are running as standalone
// automatic connections would defeat the purpose.
config.autoConnect =
!getConfig().RUN_STANDALONE &&
!getConfig().PEER_PRIVATE;
config.listeningPort = getConfig().peerListeningPort;
config.features = "";
// Enforce business rules
config.applyTuning ();
m_peerFinder->setConfig (config);
auto bootstrapIps (getConfig ().IPS);
// If no IPs are specified, use the Ripple Labs round robin
// pool to get some servers to insert into the boot cache.
if (bootstrapIps.empty ())
bootstrapIps.push_back ("r.ripple.com 51235");
if (!bootstrapIps.empty ())
{
m_resolver.resolve (bootstrapIps,
[this](
std::string const& name,
std::vector <beast::IP::Endpoint> const& addresses)
{
std::vector <std::string> ips;
for (auto const& addr : addresses)
ips.push_back (to_string (addr));
std::string const base ("config: ");
if (!ips.empty ())
m_peerFinder->addFallbackStrings (base + name, ips);
});
}
// Add the ips_fixed from the rippled.cfg file
if (! getConfig ().RUN_STANDALONE && !getConfig ().IPS_FIXED.empty ())
{
m_resolver.resolve (getConfig ().IPS_FIXED,
[this](
std::string const& name,
std::vector <beast::IP::Endpoint> const& addresses)
{
if (!addresses.empty ())
m_peerFinder->addFixedPeer (name, addresses);
});
}
// Configure the peer doors, which allow the server to accept incoming
// peer connections:
if (! getConfig ().RUN_STANDALONE)
{
m_doorDirect = make_PeerDoor (*this, getConfig ().PEER_IP,
getConfig ().peerListeningPort, m_io_service);
}
}
void
OverlayImpl::onStart ()
{
// mutex not needed since we aren't running
++m_child_count;
boost::asio::spawn (m_io_service, std::bind (
&OverlayImpl::do_timer, this, std::placeholders::_1));
}
/** Close all peer connections.
If `graceful` is true then active
Requirements:
Caller must hold the mutex.
*/
void
OverlayImpl::close_all (bool graceful)
{
for (auto const& entry : m_peers)
{
PeerImp::ptr const peer (entry.second.lock());
// VFALCO The only case where the weak_ptr is expired should be if
// ~PeerImp is pre-empted before it calls m_peers.remove()
//
if (peer != nullptr)
peer->close();
}
}
void
OverlayImpl::onStop ()
{
error_code ec;
timer_.cancel(ec);
if (m_doorDirect)
m_doorDirect->stop();
if (m_doorProxy)
m_doorProxy->stop();
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
// Take off the extra count we added in the constructor
release();
close_all (false);
}
void
OverlayImpl::onChildrenStopped ()
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
check_stopped ();
}
//--------------------------------------------------------------------------
//
// PropertyStream
//
//--------------------------------------------------------------------------
void
OverlayImpl::onWrite (beast::PropertyStream::Map& stream)
{
}
//--------------------------------------------------------------------------
/** A peer has connected successfully
This is called after the peer handshake has been completed and during
peer activation. At this point, the peer address and the public key
are known.
*/
void
OverlayImpl::activate (Peer::ptr const& peer)
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
// Now track this peer
{
auto const result (m_shortIdMap.emplace (
std::piecewise_construct,
std::make_tuple (peer->getShortId()),
std::make_tuple (peer)));
assert(result.second);
(void) result.second;
}
{
auto const result (m_publicKeyMap.emplace (
std::piecewise_construct,
std::make_tuple (peer->getNodePublic()),
std::make_tuple (peer)));
assert(result.second);
(void) result.second;
}
m_journal.debug <<
"activated " << peer->getRemoteAddress() <<
" (" << peer->getShortId() <<
":" << RipplePublicKey(peer->getNodePublic()) << ")";
// We just accepted this peer so we have non-zero active peers
assert(size() != 0);
}
/** A peer is being disconnected
This is called during the disconnection of a known, activated peer. It
will not be called for outbound peer connections that don't succeed or
for connections of peers that are dropped prior to being activated.
*/
void
OverlayImpl::onPeerDisconnect (Peer::ptr const& peer)
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
m_shortIdMap.erase (peer->getShortId ());
m_publicKeyMap.erase (peer->getNodePublic ());
}
/** The number of active peers on the network
Active peers are only those peers that have completed the handshake
and are running the Ripple protocol.
*/
std::size_t
OverlayImpl::size ()
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
return m_publicKeyMap.size ();
}
// Returns information on verified peers.
Json::Value
OverlayImpl::json ()
{
return foreach (get_peer_json());
}
Overlay::PeerSequence
OverlayImpl::getActivePeers ()
{
Overlay::PeerSequence ret;
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
ret.reserve (m_publicKeyMap.size ());
for (auto const& e : m_publicKeyMap)
{
assert (e.second);
ret.push_back (e.second);
}
return ret;
}
Peer::ptr
OverlayImpl::findPeerByShortID (Peer::ShortId const& id)
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
PeerByShortId::iterator const iter (
m_shortIdMap.find (id));
if (iter != m_shortIdMap.end ())
return iter->second;
return Peer::ptr();
}
//------------------------------------------------------------------------------
void
OverlayImpl::autoconnect()
{
auto const result = m_peerFinder->autoconnect();
for (auto addr : result)
connect (addr);
}
void
OverlayImpl::sendpeers()
{
auto const result = m_peerFinder->sendpeers();
for (auto const& e : result)
{
// VFALCO TODO Make sure this doesn't race with closing the peer
PeerImp::ptr peer;
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
PeersBySlot::iterator const iter = m_peers.find (e.first);
if (iter != m_peers.end())
peer = iter->second.lock();
}
if (peer)
peer->send_endpoints (e.second.begin(), e.second.end());
}
}
void
OverlayImpl::do_timer (yield_context yield)
{
for(;;)
{
m_peerFinder->once_per_second();
sendpeers();
autoconnect();
timer_.expires_from_now (std::chrono::seconds(1));
error_code ec;
timer_.async_wait (yield[ec]);
if (ec == boost::asio::error::operation_aborted)
break;
}
// Take off a reference
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
release();
}
//------------------------------------------------------------------------------
std::unique_ptr <Overlay>
make_Overlay (
beast::Stoppable& parent,
Resource::Manager& resourceManager,
SiteFiles::Manager& siteFiles,
beast::File const& pathToDbFileOrDirectory,
Resolver& resolver,
boost::asio::io_service& io_service,
boost::asio::ssl::context& ssl_context)
{
return std::make_unique <OverlayImpl> (parent, resourceManager, siteFiles,
pathToDbFileOrDirectory, resolver, io_service, ssl_context);
}
}