Split peer connect logic to another class (RIPD-711):

All of the logic for establishing an outbound peer connection including
the initial HTTP handshake exchange is moved into a separate class. This
allows PeerImp to have a strong invariant: All PeerImp objects that exist
represent active peer connections that have already gone through the
handshake process.
This commit is contained in:
Vinnie Falco
2014-11-22 07:36:50 -08:00
committed by Nik Bougalis
parent 930a0beaf1
commit 32062e439f
14 changed files with 963 additions and 378 deletions

View File

@@ -20,6 +20,7 @@
#include <ripple/basics/Log.h>
#include <ripple/basics/make_SSLContext.h>
#include <ripple/server/JsonWriter.h>
#include <ripple/overlay/impl/ConnectAttempt.h>
#include <ripple/overlay/impl/OverlayImpl.h>
#include <ripple/overlay/impl/PeerImp.h>
#include <ripple/overlay/impl/TMHello.h>
@@ -27,6 +28,7 @@
#include <beast/ByteOrder.h>
#include <beast/http/rfc2616.h>
#include <beast/utility/ci_char_traits.h>
#include <beast/utility/WrappedSink.h>
namespace ripple {
@@ -185,21 +187,25 @@ OverlayImpl::onHandoff (std::unique_ptr <beast::asio::ssl_bundle>&& ssl_bundle,
beast::http::message&& request,
endpoint_type remote_endpoint)
{
auto const id = next_id_++;
beast::WrappedSink sink (deprecatedLogs()["Peer"], makePrefix(id));
beast::Journal journal (sink);
Handoff handoff;
if (! isPeerUpgrade(request))
return handoff;
handoff.moved = true;
if (journal_.trace) journal_.trace <<
if (journal.trace) journal.trace <<
"Peer connection upgrade from " << remote_endpoint;
error_code ec;
auto const local_endpoint (ssl_bundle->socket.local_endpoint(ec));
if (ec)
{
if (journal_.trace) journal_.trace <<
"Peer " << remote_endpoint << " failed: " << ec.message();
if (journal.trace) journal.trace <<
remote_endpoint << " failed: " << ec.message();
return handoff;
}
@@ -243,19 +249,19 @@ OverlayImpl::onHandoff (std::unique_ptr <beast::asio::ssl_bundle>&& ssl_bundle,
bool success = true;
protocol::TMHello hello;
std::tie(hello, success) = parseHello (request, journal_);
std::tie(hello, success) = parseHello (request, journal);
if(! success)
return handoff;
uint256 sharedValue;
std::tie(sharedValue, success) = makeSharedValue(
ssl_bundle->stream.native_handle(), journal_);
ssl_bundle->stream.native_handle(), journal);
if(! success)
return handoff;
RippleAddress publicKey;
std::tie(publicKey, success) = verifyHello (hello,
sharedValue, journal_, getApp());
sharedValue, journal, getApp());
if(! success)
return handoff;
@@ -267,7 +273,7 @@ OverlayImpl::onHandoff (std::unique_ptr <beast::asio::ssl_bundle>&& ssl_bundle,
RipplePublicKey(publicKey), cluster);
if (result != PeerFinder::Result::success)
{
if (journal_.trace) journal_.trace <<
if (journal.trace) journal.trace <<
"Peer " << remote_endpoint << " redirected, slots full";
handoff.moved = false;
handoff.response = makeRedirectResponse(slot, request,
@@ -276,7 +282,7 @@ OverlayImpl::onHandoff (std::unique_ptr <beast::asio::ssl_bundle>&& ssl_bundle,
return handoff;
}
auto const peer = std::make_shared<PeerImp>(next_id_++,
auto const peer = std::make_shared<PeerImp>(id,
remote_endpoint, slot, std::move(request), hello, publicKey,
consumer, std::move(ssl_bundle), *this);
{
@@ -307,6 +313,14 @@ OverlayImpl::isPeerUpgrade(beast::http::message const& request)
return true;
}
std::string
OverlayImpl::makePrefix (std::uint32_t id)
{
std::stringstream ss;
ss << "[" << std::setfill('0') << std::setw(3) << id << "] ";
return ss.str();
}
std::shared_ptr<HTTP::Writer>
OverlayImpl::makeRedirectResponse (PeerFinder::Slot::ptr const& slot,
beast::http::message const& request, address_type remote_address)
@@ -340,23 +354,69 @@ OverlayImpl::connect (beast::IP::Endpoint const& remote_endpoint)
{
assert(work_);
PeerFinder::Slot::ptr const slot =
m_peerFinder->new_outbound_slot (remote_endpoint);
if (slot == nullptr)
return;
auto const peer = std::make_shared<PeerImp>(next_id_++,
remote_endpoint, slot, io_service_, setup_.context, *this);
auto usage = resourceManager().newOutboundEndpoint (remote_endpoint);
if (usage.disconnect())
{
// We're on the strand but lets make this code
// the same as the others to avoid confusion.
std::lock_guard <decltype(mutex_)> lock (mutex_);
add(peer);
peer->run();
if (journal_.info) journal_.info <<
"Over resource limit: " << remote_endpoint;
return;
}
auto const p = std::make_shared<ConnectAttempt>(
io_service_, beast::IPAddressConversion::to_asio_endpoint(remote_endpoint),
usage, setup_.context, next_id_++,
deprecatedLogs().journal("Peer"), *this);
std::lock_guard<decltype(mutex_)> lock(mutex_);
list_.emplace(p.get(), p);
p->run();
}
//------------------------------------------------------------------------------
// Adds a peer that is already handshaked and active
void
OverlayImpl::add_active (std::shared_ptr<PeerImp> const& peer)
{
std::lock_guard <decltype(mutex_)> lock (mutex_);
{
auto const result =
m_peers.emplace (peer->slot(), peer);
assert (result.second);
(void) result.second;
}
// Now track this peer
{
auto const result (m_shortIdMap.emplace (
std::piecewise_construct,
std::make_tuple (peer->id()),
std::make_tuple (peer)));
assert(result.second);
(void) result.second;
}
{
auto const result (m_publicKeyMap.emplace(
peer->getNodePublic(), peer));
assert(result.second);
(void) result.second;
}
list_.emplace(peer.get(), peer);
journal_.debug <<
"activated " << peer->getRemoteAddress() <<
" (" << peer->id() <<
":" << RipplePublicKey(peer->getNodePublic()) << ")";
// As we are not on the strand, run() must be called
// while holding the lock, otherwise new I/O can be
// queued after a call to stop().
peer->run();
}
void
OverlayImpl::remove (PeerFinder::Slot::ptr const& slot)
{