HTTP Handshaking for Peers on Universal Port (RIPD-446):

This introduces a considerable change in the way that peers handshake. Instead
of sending the TMHello protocol message, the peer making the connection (client
role) sends an HTTP Upgrade request along with some special headers. The peer
acting in the server role sends an HTTP response completing the upgrade and
transition to RTXP (Ripple Transaction Protocol, a.k.a. peer protocol). If the
server has no available slots, then it sends a 503 Service Unavailable HTTP
response with a JSON content-body containing IP addresses of other servers to
try. The information that was previously contained in the TMHello message is
now communicated in the HTTP request and HTTP response including the secure
cookie to prevent man in the middle attacks. This information is documented
in the overlay README.md file.

To prevent disruption on the network, the handshake feature is rolled out in
two parts. This is part 1, where new servents acting in the client role will
send the old style TMHello handshake, and new servents acting in the server
role can automatically detect and accept both the old style TMHello handshake,
or the HTTP request accordingly. This detection happens in the Server module,
which supports the universal port. An experimental .cfg setting allows clients
to instead send HTTP handshakes when establishing peer connections. When this
code has reached a significant fraction of the network, these clients will be
able to establish a connection to the Ripple network using HTTP handshakes.

These changes clean up the handling of the socket for peers. It fixes a long
standing bug in the graceful close sequence, where remaining data such as the
IP addresses of other servers to try, did not get sent. Redundant state
variables for the peer are removed and the treatment of completion handlers is
streamlined. The treatment of SSL short reads and secure shutdown is also fixed.

Logging for the peers in the overlay module are divided into two partitions:
"Peer" and "Protocol". The Peer partition records activity taking place on the
socket while the Protocol partition informs about RTXP specific actions such as
transaction relay, fetch packs, and consensus rounds. The severity on the log
partitions may be adjusted independently to diagnose problems. Every log
message for peers is prefixed with a small, unique integer id in brackets,
to accurately associate log messages with peers.

HTTP handshaking is the first step in implementing the Hub and Spoke feature,
which transforms the network from a homogeneous network where all peers are
the same, into a structured network where peers with above average capabilities
in their ability to process ledgers and transactions self-assemble to form a
backbone of high powered machines which in turn serve a much larger number of
'leaves' with lower capacities with a goal to improve the number of
transactions that may be retired over time.
This commit is contained in:
Vinnie Falco
2014-11-02 05:36:14 -08:00
parent 30123eaa4a
commit d4fd5e4fce
29 changed files with 1955 additions and 1570 deletions

View File

@@ -22,8 +22,11 @@
#include <ripple/server/JsonWriter.h>
#include <ripple/overlay/impl/OverlayImpl.h>
#include <ripple/overlay/impl/PeerImp.h>
#include <ripple/overlay/impl/TMHello.h>
#include <ripple/peerfinder/make_Manager.h>
#include <beast/ByteOrder.h>
#include <beast/http/rfc2616.h>
#include <beast/utility/ci_char_traits.h>
namespace ripple {
@@ -69,7 +72,7 @@ OverlayImpl::Timer::Timer (OverlayImpl& overlay)
}
void
OverlayImpl::Timer::close()
OverlayImpl::Timer::stop()
{
error_code ec;
timer_.cancel(ec);
@@ -128,14 +131,14 @@ OverlayImpl::OverlayImpl (
pathToDbFileOrDirectory, get_seconds_clock(),
deprecatedLogs().journal("PeerFinder")))
, m_resolver (resolver)
, m_nextShortId (0)
, next_id_(1)
{
beast::PropertyStream::Source::add (m_peerFinder.get());
}
OverlayImpl::~OverlayImpl ()
{
close();
stop();
// Block until dependent objects have been destroyed.
// This is just to catch improper use of the Stoppable API.
@@ -149,8 +152,7 @@ OverlayImpl::~OverlayImpl ()
void
OverlayImpl::onLegacyPeerHello (
std::unique_ptr<beast::asio::ssl_bundle>&& ssl_bundle,
boost::asio::const_buffer buffer,
boost::asio::ip::tcp::endpoint remote_address)
boost::asio::const_buffer buffer, endpoint_type remote_endpoint)
{
error_code ec;
auto const local_endpoint (ssl_bundle->socket.local_endpoint(ec));
@@ -159,51 +161,130 @@ OverlayImpl::onLegacyPeerHello (
auto const slot = m_peerFinder->new_inbound_slot (
beast::IPAddressConversion::from_asio(local_endpoint),
beast::IPAddressConversion::from_asio(remote_address));
beast::IPAddressConversion::from_asio(remote_endpoint));
if (slot != nullptr)
return addpeer (std::make_shared<PeerImp>(std::move(ssl_bundle),
boost::asio::const_buffers_1(buffer),
beast::IPAddressConversion::from_asio(remote_address),
*this, m_resourceManager, *m_peerFinder, slot));
if (slot == nullptr)
// self connect, close
return;
auto const peer = std::make_shared<PeerImp>(std::move(ssl_bundle),
boost::asio::const_buffers_1(buffer), remote_endpoint, *this,
m_resourceManager, *m_peerFinder, slot, next_id_++);
{
// As we are not on the strand, run() must be called
// while holding the lock, otherwise new I/O can be
// queued after a call to stop().
std::lock_guard <decltype(mutex_)> lock (mutex_);
add(peer);
peer->run();
}
}
Handoff
OverlayImpl::onHandoff (std::unique_ptr <beast::asio::ssl_bundle>&& ssl_bundle,
beast::http::message&& request,
boost::asio::ip::tcp::endpoint remote_address)
endpoint_type remote_endpoint)
{
Handoff handoff;
if (! isPeerUpgrade(request))
return handoff;
handoff.moved = true;
if (journal_.trace) journal_.trace <<
"Peer connection upgrade from " << remote_endpoint;
error_code ec;
auto const local_endpoint (ssl_bundle->socket.local_endpoint(ec));
if (ec)
{
// log?
// Since we don't call std::move the socket will be closed.
if (journal_.trace) journal_.trace <<
"Peer " << remote_endpoint << " failed: " << ec.message();
return handoff;
}
auto consumer = m_resourceManager.newInboundEndpoint(
beast::IPAddressConversion::from_asio(remote_endpoint));
if (consumer.disconnect())
return handoff;
auto const slot = m_peerFinder->new_inbound_slot (
beast::IPAddressConversion::from_asio(local_endpoint),
beast::IPAddressConversion::from_asio(remote_endpoint));
if (slot == nullptr)
{
// self-connect, close
handoff.moved = false;
return handoff;
}
// TODO Validate HTTP request
auto const slot = m_peerFinder->new_inbound_slot (
beast::IPAddressConversion::from_asio(local_endpoint),
beast::IPAddressConversion::from_asio(remote_address));
if (slot == nullptr)
{
// self connect
auto const types = beast::rfc2616::split_commas(
request.headers["Connect-As"]);
if (std::find_if(types.begin(), types.end(),
[](std::string const& s)
{
return beast::ci_equal(s,
std::string("peer"));
}) == types.end())
{
handoff.moved = false;
handoff.response = makeRedirectResponse(slot, request,
remote_endpoint.address());
handoff.keep_alive = request.keep_alive();
return handoff;
}
}
handoff.moved = true;
bool success = true;
protocol::TMHello hello;
std::tie(hello, success) = parseHello (request, journal_);
if(! success)
return handoff;
uint256 sharedValue;
std::tie(sharedValue, success) = makeSharedValue(
ssl_bundle->stream.native_handle(), journal_);
if(! success)
return handoff;
RippleAddress publicKey;
std::tie(publicKey, success) = verifyHello (hello,
sharedValue, journal_, getApp());
if(! success)
return handoff;
std::string name;
bool clusterNode = getApp().getUNL().nodeInCluster(
publicKey, name);
auto const result = m_peerFinder->activate (slot,
RipplePublicKey(publicKey), clusterNode);
if (result != PeerFinder::Result::success)
{
if (journal_.trace) journal_.trace <<
"Peer " << remote_endpoint << " redirected, slots full";
handoff.moved = false;
handoff.response = makeRedirectResponse(slot, request,
remote_endpoint.address());
handoff.keep_alive = request.keep_alive();
return handoff;
}
// For now, always redirect
// Full, give them some addresses
handoff.response = makeRedirectResponse(slot, request);
handoff.keep_alive = request.keep_alive();
auto const peer = std::make_shared<PeerImp>(std::move(ssl_bundle),
std::move(request), hello, remote_endpoint, publicKey, consumer,
slot, *this, m_resourceManager, *m_peerFinder, next_id_++);
{
// As we are not on the strand, run() must be called
// while holding the lock, otherwise new I/O can be
// queued after a call to stop().
std::lock_guard <decltype(mutex_)> lock (mutex_);
add(peer);
peer->run();
}
handoff.moved = true;
return handoff;
}
@@ -214,14 +295,18 @@ OverlayImpl::isPeerUpgrade(beast::http::message const& request)
{
if (! request.upgrade())
return false;
if (request.headers["Upgrade"] != "Ripple/1.2")
auto const versions = parse_ProtocolVersions(
request.headers["Upgrade"]);
if (versions.size() == 0)
return false;
if (! request.request() && request.status() != 101)
return false;
return true;
}
std::shared_ptr<HTTP::Writer>
OverlayImpl::makeRedirectResponse (PeerFinder::Slot::ptr const& slot,
beast::http::message const& request)
beast::http::message const& request, address_type remote_address)
{
Json::Value json(Json::objectValue);
{
@@ -235,6 +320,7 @@ OverlayImpl::makeRedirectResponse (PeerFinder::Slot::ptr const& slot,
m.request(false);
m.status(503);
m.reason("Service Unavailable");
m.headers.append("Remote-Address", remote_address.to_string());
m.version(request.version());
if (request.version() == std::make_pair(1, 0))
{
@@ -251,20 +337,20 @@ OverlayImpl::connect (beast::IP::Endpoint const& remote_endpoint)
{
assert(work_);
PeerFinder::Slot::ptr const slot (
m_peerFinder->new_outbound_slot (remote_endpoint));
PeerFinder::Slot::ptr const slot =
m_peerFinder->new_outbound_slot (remote_endpoint);
if (slot == nullptr)
return;
addpeer (std::make_shared <PeerImp> (remote_endpoint, io_service_, *this,
m_resourceManager, *m_peerFinder, slot, setup_.context));
}
Peer::ShortId
OverlayImpl::next_id()
{
return ++m_nextShortId;
auto const peer = std::make_shared <PeerImp> (remote_endpoint,
io_service_, *this, m_resourceManager, *m_peerFinder, slot,
setup_.context, next_id_++);
{
// We're on the strand but lets make this code
// the same as the others to avoid confusion.
std::lock_guard <decltype(mutex_)> lock (mutex_);
add(peer);
peer->run();
}
}
//--------------------------------------------------------------------------
@@ -273,8 +359,8 @@ void
OverlayImpl::remove (PeerFinder::Slot::ptr const& slot)
{
std::lock_guard <decltype(mutex_)> lock (mutex_);
PeersBySlot::iterator const iter (m_peers.find (slot));
assert (iter != m_peers.end ());
auto const iter = m_peers.find (slot);
assert(iter != m_peers.end ());
m_peers.erase (iter);
}
@@ -284,6 +370,14 @@ OverlayImpl::remove (PeerFinder::Slot::ptr const& slot)
//
//--------------------------------------------------------------------------
// Caller must hold the mutex
void
OverlayImpl::checkStopped ()
{
if (isStopping() && areChildrenStopped () && list_.empty())
stopped();
}
void
OverlayImpl::onPrepare()
{
@@ -321,17 +415,14 @@ OverlayImpl::onPrepare()
if (!bootstrapIps.empty ())
{
m_resolver.resolve (bootstrapIps,
[this](
std::string const& name,
[this](std::string const& name,
std::vector <beast::IP::Endpoint> const& addresses)
{
std::vector <std::string> ips;
ips.reserve(addresses.size());
for (auto const& addr : addresses)
ips.push_back (to_string (addr));
std::string const base ("config: ");
if (!ips.empty ())
m_peerFinder->addFallbackStrings (base + name, ips);
});
@@ -364,7 +455,7 @@ OverlayImpl::onStart ()
void
OverlayImpl::onStop ()
{
strand_.dispatch(std::bind(&OverlayImpl::close, this));
strand_.dispatch(std::bind(&OverlayImpl::stop, this));
}
void
@@ -392,7 +483,7 @@ OverlayImpl::onWrite (beast::PropertyStream::Map& stream)
are known.
*/
void
OverlayImpl::activate (Peer::ptr const& peer)
OverlayImpl::activate (std::shared_ptr<PeerImp> const& peer)
{
std::lock_guard <decltype(mutex_)> lock (mutex_);
@@ -400,41 +491,35 @@ OverlayImpl::activate (Peer::ptr const& peer)
{
auto const result (m_shortIdMap.emplace (
std::piecewise_construct,
std::make_tuple (peer->getShortId()),
std::make_tuple (peer->id()),
std::make_tuple (peer)));
assert(result.second);
(void) result.second;
}
{
auto const result (m_publicKeyMap.emplace (
std::piecewise_construct,
std::make_tuple (peer->getNodePublic()),
std::make_tuple (peer)));
auto const result (m_publicKeyMap.emplace(
peer->getNodePublic(), peer));
assert(result.second);
(void) result.second;
}
journal_.debug <<
"activated " << peer->getRemoteAddress() <<
" (" << peer->getShortId() <<
" (" << peer->id() <<
":" << RipplePublicKey(peer->getNodePublic()) << ")";
// We just accepted this peer so we have non-zero active peers
assert(size() != 0);
}
/** A peer is being disconnected
This is called during the disconnection of a known, activated peer. It
will not be called for outbound peer connections that don't succeed or
for connections of peers that are dropped prior to being activated.
*/
void
OverlayImpl::onPeerDisconnect (Peer::ptr const& peer)
OverlayImpl::onPeerDeactivate (Peer::id_t id,
RippleAddress const& publicKey)
{
std::lock_guard <decltype(mutex_)> lock (mutex_);
m_shortIdMap.erase (peer->getShortId ());
m_publicKeyMap.erase (peer->getNodePublic ());
m_shortIdMap.erase(id);
m_publicKeyMap.erase(publicKey);
}
/** The number of active peers on the network
@@ -456,36 +541,45 @@ OverlayImpl::json ()
}
Overlay::PeerSequence
OverlayImpl::getActivePeers ()
OverlayImpl::getActivePeers()
{
Overlay::PeerSequence ret;
std::lock_guard <decltype(mutex_)> lock (mutex_);
ret.reserve (m_publicKeyMap.size ());
for (auto const& e : m_publicKeyMap)
{
assert (e.second);
ret.push_back (e.second);
auto const sp = e.second.lock();
if (sp)
ret.push_back(sp);
}
return ret;
}
Peer::ptr
OverlayImpl::findPeerByShortID (Peer::ShortId const& id)
OverlayImpl::findPeerByShortID (Peer::id_t const& id)
{
std::lock_guard <decltype(mutex_)> lock (mutex_);
PeerByShortId::iterator const iter (
m_shortIdMap.find (id));
auto const iter = m_shortIdMap.find (id);
if (iter != m_shortIdMap.end ())
return iter->second;
return iter->second.lock();
return Peer::ptr();
}
//------------------------------------------------------------------------------
void
OverlayImpl::add (std::shared_ptr<PeerImp> const& peer)
{
{
auto const result =
m_peers.emplace (peer->slot(), peer);
assert (result.second);
(void) result.second;
}
list_.emplace(peer.get(), peer);
}
void
OverlayImpl::remove (Child& child)
{
@@ -495,16 +589,8 @@ OverlayImpl::remove (Child& child)
checkStopped();
}
// Caller must hold the mutex
void
OverlayImpl::checkStopped ()
{
if (isStopping() && areChildrenStopped () && list_.empty())
stopped();
}
void
OverlayImpl::close()
OverlayImpl::stop()
{
std::lock_guard<decltype(mutex_)> lock(mutex_);
if (work_)
@@ -515,28 +601,11 @@ OverlayImpl::close()
auto const child = _.second.lock();
// Happens when the child is about to be destroyed
if (child != nullptr)
child->close();
child->stop();
}
}
}
void
OverlayImpl::addpeer (std::shared_ptr<PeerImp> const& peer)
{
std::lock_guard <decltype(mutex_)> lock (mutex_);
{
std::pair <PeersBySlot::iterator, bool> const result (
m_peers.emplace (peer->slot(), peer));
assert (result.second);
(void) result.second;
}
list_.emplace(peer.get(), peer);
// This has to happen while holding the lock,
// otherwise the socket might not be canceled during a stop.
peer->start();
}
void
OverlayImpl::autoConnect()
{
@@ -551,16 +620,15 @@ OverlayImpl::sendEndpoints()
auto const result = m_peerFinder->buildEndpointsForPeers();
for (auto const& e : result)
{
// VFALCO TODO Make sure this doesn't race with closing the peer
PeerImp::ptr peer;
std::shared_ptr<PeerImp> peer;
{
std::lock_guard <decltype(mutex_)> lock (mutex_);
PeersBySlot::iterator const iter = m_peers.find (e.first);
auto const iter = m_peers.find (e.first);
if (iter != m_peers.end())
peer = iter->second.lock();
}
if (peer)
peer->send_endpoints (e.second.begin(), e.second.end());
peer->sendEndpoints (e.second.begin(), e.second.end());
}
}