mirror of
https://github.com/XRPLF/rippled.git
synced 2025-11-23 12:35:50 +00:00
Always use HTTP handshaking in overlay:
Inbound and outbound peer connections always use HTTP handshakes to negotiate connections, instead of the deprecated TMHello protocol message. rippled versions 0.27.0 and later support both optional HTTP handshakes and legacy TMHello messages, so always using HTTP handshakes should not cause disruption. However, versions before 0.27.0 will no longer be able to participate in the overlay network - support for handshaking via the TMHello message is removed.
This commit is contained in:
committed by
Tom Ritchford
parent
e43ffa6f2b
commit
f56e37398c
@@ -374,12 +374,6 @@
|
||||
# When set, activates the autoconnect feature. This maintains outgoing
|
||||
# connections using PeerFinder's "Outgoing Connection Strategy."
|
||||
#
|
||||
# http_handshake = 0 | 1
|
||||
#
|
||||
# When set, outgoing peer connections will handshaking using a HTTP
|
||||
# request instead of the legacy TMHello protocol buffers message.
|
||||
# Incoming peer connections have their handshakes detected automatically.
|
||||
#
|
||||
# become_superpeer = 'never' | 'always' | 'auto'
|
||||
#
|
||||
# Controls the selection of peer roles:
|
||||
|
||||
@@ -63,7 +63,6 @@ public:
|
||||
struct Setup
|
||||
{
|
||||
bool auto_connect = true;
|
||||
bool http_handshake = false;
|
||||
Promote promote = Promote::automatic;
|
||||
std::shared_ptr<boost::asio::ssl::context> context;
|
||||
};
|
||||
@@ -72,13 +71,6 @@ public:
|
||||
|
||||
virtual ~Overlay() = default;
|
||||
|
||||
/** Accept a legacy protocol handshake connection. */
|
||||
virtual
|
||||
void
|
||||
onLegacyPeerHello (std::unique_ptr<beast::asio::ssl_bundle>&& ssl_bundle,
|
||||
boost::asio::const_buffer buffer,
|
||||
boost::asio::ip::tcp::endpoint remote_address) = 0;
|
||||
|
||||
/** Conditionally accept an incoming HTTP request. */
|
||||
virtual
|
||||
Handoff
|
||||
|
||||
@@ -207,9 +207,6 @@ ConnectAttempt::onHandshake (error_code ec)
|
||||
beast::IPAddressConversion::from_asio (local_endpoint)))
|
||||
return fail("Duplicate connection");
|
||||
|
||||
if (! overlay_.setup().http_handshake)
|
||||
return doLegacy();
|
||||
|
||||
bool success;
|
||||
uint256 sharedValue;
|
||||
std::tie(sharedValue, success) = makeSharedValue(
|
||||
@@ -327,188 +324,6 @@ ConnectAttempt::onShutdown (error_code ec)
|
||||
|
||||
//--------------------------------------------------------------------------
|
||||
|
||||
// Perform a legacy outgoing connection
|
||||
void
|
||||
ConnectAttempt::doLegacy()
|
||||
{
|
||||
if(journal_.trace) journal_.trace <<
|
||||
"doLegacy";
|
||||
|
||||
bool success;
|
||||
uint256 sharedValue;
|
||||
std::tie(sharedValue, success) = makeSharedValue(
|
||||
stream_.native_handle(), journal_);
|
||||
if (! success)
|
||||
return fail("hello");
|
||||
|
||||
auto const hello = buildHello(sharedValue, getApp());
|
||||
write (write_buf_, hello, protocol::mtHELLO,
|
||||
Tuning::readBufferBytes);
|
||||
|
||||
stream_.async_write_some (write_buf_.data(),
|
||||
strand_.wrap (std::bind (&ConnectAttempt::onWriteHello,
|
||||
shared_from_this(), beast::asio::placeholders::error,
|
||||
beast::asio::placeholders::bytes_transferred)));
|
||||
|
||||
// Timer gets reset after header AND body received
|
||||
setTimer();
|
||||
boost::asio::async_read (stream_, read_buf_.prepare (
|
||||
Message::kHeaderBytes), boost::asio::transfer_exactly (
|
||||
Message::kHeaderBytes), strand_.wrap (std::bind (
|
||||
&ConnectAttempt::onReadHeader, shared_from_this(),
|
||||
beast::asio::placeholders::error,
|
||||
beast::asio::placeholders::bytes_transferred)));
|
||||
}
|
||||
|
||||
void
|
||||
ConnectAttempt::onWriteHello (error_code ec, std::size_t bytes_transferred)
|
||||
{
|
||||
if(! stream_.next_layer().is_open())
|
||||
return;
|
||||
if(ec == boost::asio::error::operation_aborted)
|
||||
return;
|
||||
if(ec)
|
||||
return fail("onWriteHello", ec);
|
||||
if(journal_.trace)
|
||||
{
|
||||
if(bytes_transferred > 0) journal_.trace <<
|
||||
"onWriteHello: " << bytes_transferred << " bytes";
|
||||
else journal_.trace <<
|
||||
"onWriteHello";
|
||||
}
|
||||
|
||||
write_buf_.consume (bytes_transferred);
|
||||
if (write_buf_.size() > 0)
|
||||
return stream_.async_write_some (write_buf_.data(),
|
||||
strand_.wrap (std::bind (&ConnectAttempt::onWriteHello,
|
||||
shared_from_this(), beast::asio::placeholders::error,
|
||||
beast::asio::placeholders::bytes_transferred)));
|
||||
}
|
||||
|
||||
void
|
||||
ConnectAttempt::onReadHeader (error_code ec,
|
||||
std::size_t bytes_transferred)
|
||||
{
|
||||
if(! stream_.next_layer().is_open())
|
||||
return;
|
||||
if(ec == boost::asio::error::operation_aborted)
|
||||
return;
|
||||
if(ec == boost::asio::error::eof)
|
||||
{
|
||||
if(journal_.info) journal_.info <<
|
||||
"EOF";
|
||||
setTimer();
|
||||
return stream_.async_shutdown(strand_.wrap(std::bind(
|
||||
&ConnectAttempt::onShutdown, shared_from_this(),
|
||||
beast::asio::placeholders::error)));
|
||||
}
|
||||
if(ec)
|
||||
return fail("onReadHeader", ec);
|
||||
if(journal_.trace)
|
||||
{
|
||||
if(bytes_transferred > 0) journal_.trace <<
|
||||
"onReadHeader: " << bytes_transferred << " bytes";
|
||||
else journal_.trace <<
|
||||
"onReadHeader";
|
||||
}
|
||||
|
||||
assert(bytes_transferred == Message::kHeaderBytes);
|
||||
read_buf_.commit(bytes_transferred);
|
||||
|
||||
int const type = Message::type(read_buf_.data());
|
||||
if (type != protocol::mtHELLO)
|
||||
return fail("Expected TMHello");
|
||||
|
||||
std::size_t const bytes_needed =
|
||||
Message::size(read_buf_.data());
|
||||
|
||||
read_buf_.consume (Message::kHeaderBytes);
|
||||
|
||||
boost::asio::async_read (stream_, read_buf_.prepare(bytes_needed),
|
||||
boost::asio::transfer_exactly(bytes_needed), strand_.wrap (
|
||||
std::bind (&ConnectAttempt::onReadBody, shared_from_this(),
|
||||
beast::asio::placeholders::error,
|
||||
beast::asio::placeholders::bytes_transferred)));
|
||||
}
|
||||
|
||||
void
|
||||
ConnectAttempt::onReadBody (error_code ec,
|
||||
std::size_t bytes_transferred)
|
||||
{
|
||||
cancelTimer();
|
||||
|
||||
if(! stream_.next_layer().is_open())
|
||||
return;
|
||||
if(ec == boost::asio::error::operation_aborted)
|
||||
return;
|
||||
if(ec == boost::asio::error::eof)
|
||||
{
|
||||
if(journal_.info) journal_.info <<
|
||||
"EOF";
|
||||
setTimer();
|
||||
return stream_.async_shutdown(strand_.wrap(std::bind(
|
||||
&ConnectAttempt::onShutdown, shared_from_this(),
|
||||
beast::asio::placeholders::error)));
|
||||
}
|
||||
if(ec)
|
||||
return fail("onReadBody", ec);
|
||||
if(journal_.trace)
|
||||
{
|
||||
if(bytes_transferred > 0) journal_.trace <<
|
||||
"onReadBody: " << bytes_transferred << " bytes";
|
||||
else journal_.trace <<
|
||||
"onReadBody";
|
||||
}
|
||||
|
||||
read_buf_.commit (bytes_transferred);
|
||||
|
||||
protocol::TMHello hello;
|
||||
ZeroCopyInputStream<
|
||||
beast::asio::streambuf::const_buffers_type> stream (
|
||||
read_buf_.data());
|
||||
if (! hello.ParseFromZeroCopyStream (&stream))
|
||||
return fail("onReadBody: parse");
|
||||
read_buf_.consume (stream.ByteCount());
|
||||
|
||||
bool success;
|
||||
uint256 sharedValue;
|
||||
std::tie(sharedValue, success) = makeSharedValue(
|
||||
ssl_bundle_->stream.native_handle(), journal_);
|
||||
if(! success)
|
||||
return close(); // makeSharedValue logs
|
||||
|
||||
RippleAddress publicKey;
|
||||
std::tie(publicKey, success) = verifyHello (hello,
|
||||
sharedValue, journal_, getApp());
|
||||
if(! success)
|
||||
return close(); // verifyHello logs
|
||||
|
||||
auto protocol = BuildInfo::make_protocol(hello.protoversion());
|
||||
if(journal_.info) journal_.info <<
|
||||
"Protocol: " << to_string(protocol);
|
||||
if(journal_.info) journal_.info <<
|
||||
"Public Key: " << publicKey.humanNodePublic();
|
||||
std::string name;
|
||||
bool const cluster = getApp().getUNL().nodeInCluster(publicKey, name);
|
||||
if (cluster)
|
||||
if (journal_.info) journal_.info <<
|
||||
"Cluster name: " << name;
|
||||
|
||||
auto const result = overlay_.peerFinder().activate (
|
||||
slot_, publicKey.toPublicKey(), cluster);
|
||||
if (result != PeerFinder::Result::success)
|
||||
return fail("Outbound slots full");
|
||||
|
||||
auto const peer = std::make_shared<PeerImp>(
|
||||
std::move(ssl_bundle_), read_buf_.data(),
|
||||
std::move(slot_), usage_, std::move(hello),
|
||||
publicKey, id_, overlay_);
|
||||
|
||||
overlay_.add_active (peer);
|
||||
}
|
||||
|
||||
//--------------------------------------------------------------------------
|
||||
|
||||
beast::http::message
|
||||
ConnectAttempt::makeRequest (bool crawl,
|
||||
boost::asio::ip::address const& remote_address)
|
||||
|
||||
@@ -99,11 +99,6 @@ private:
|
||||
void onRead (error_code ec, std::size_t bytes_transferred);
|
||||
void onShutdown (error_code ec);
|
||||
|
||||
void doLegacy();
|
||||
void onWriteHello (error_code ec, std::size_t bytes_transferred);
|
||||
void onReadHeader (error_code ec, std::size_t bytes_transferred);
|
||||
void onReadBody (error_code ec, std::size_t bytes_transferred);
|
||||
|
||||
static
|
||||
beast::http::message
|
||||
makeRequest (bool crawl,
|
||||
|
||||
@@ -154,37 +154,6 @@ OverlayImpl::~OverlayImpl ()
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
void
|
||||
OverlayImpl::onLegacyPeerHello (
|
||||
std::unique_ptr<beast::asio::ssl_bundle>&& ssl_bundle,
|
||||
boost::asio::const_buffer buffer, endpoint_type remote_endpoint)
|
||||
{
|
||||
error_code ec;
|
||||
auto const local_endpoint (ssl_bundle->socket.local_endpoint(ec));
|
||||
if (ec)
|
||||
return;
|
||||
|
||||
auto const slot = m_peerFinder->new_inbound_slot (
|
||||
beast::IPAddressConversion::from_asio(local_endpoint),
|
||||
beast::IPAddressConversion::from_asio(remote_endpoint));
|
||||
|
||||
if (slot == nullptr)
|
||||
// self connect, close
|
||||
return;
|
||||
|
||||
auto const peer = std::make_shared<PeerImp>(next_id_++,
|
||||
remote_endpoint, slot, boost::asio::const_buffers_1(buffer),
|
||||
std::move(ssl_bundle), *this);
|
||||
{
|
||||
// As we are not on the strand, run() must be called
|
||||
// while holding the lock, otherwise new I/O can be
|
||||
// queued after a call to stop().
|
||||
std::lock_guard <decltype(mutex_)> lock (mutex_);
|
||||
add(peer);
|
||||
peer->run();
|
||||
}
|
||||
}
|
||||
|
||||
Handoff
|
||||
OverlayImpl::onHandoff (std::unique_ptr <beast::asio::ssl_bundle>&& ssl_bundle,
|
||||
beast::http::message&& request,
|
||||
@@ -764,7 +733,6 @@ setup_Overlay (BasicConfig const& config)
|
||||
{
|
||||
Overlay::Setup setup;
|
||||
auto const& section = config.section("overlay");
|
||||
set (setup.http_handshake, "http_handshake", section);
|
||||
set (setup.auto_connect, "auto_connect", section);
|
||||
std::string promote;
|
||||
set (promote, "become_superpeer", section);
|
||||
|
||||
@@ -154,11 +154,6 @@ public:
|
||||
return setup_;
|
||||
}
|
||||
|
||||
void
|
||||
onLegacyPeerHello (std::unique_ptr<beast::asio::ssl_bundle>&& ssl_bundle,
|
||||
boost::asio::const_buffer buffer,
|
||||
endpoint_type remote_endpoint) override;
|
||||
|
||||
Handoff
|
||||
onHandoff (std::unique_ptr <beast::asio::ssl_bundle>&& bundle,
|
||||
beast::http::message&& request,
|
||||
|
||||
@@ -98,9 +98,6 @@ PeerImp::run()
|
||||
&PeerImp::run, shared_from_this()));
|
||||
if (m_inbound)
|
||||
{
|
||||
if (read_buffer_.size() > 0)
|
||||
doLegacyAccept();
|
||||
else
|
||||
doAccept();
|
||||
}
|
||||
else
|
||||
@@ -125,7 +122,7 @@ PeerImp::run()
|
||||
previousLedgerHash_.zero();
|
||||
}
|
||||
}
|
||||
doProtocolStart(false);
|
||||
doProtocolStart();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -464,17 +461,6 @@ PeerImp::onShutdown(error_code ec)
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
void PeerImp::doLegacyAccept()
|
||||
{
|
||||
assert(read_buffer_.size() > 0);
|
||||
if(journal_.debug) journal_.debug <<
|
||||
"doLegacyAccept: " << remote_address_;
|
||||
usage_ = overlay_.resourceManager().newInboundEndpoint (remote_address_);
|
||||
if (usage_.disconnect ())
|
||||
return fail("doLegacyAccept: Resources");
|
||||
doProtocolStart(true);
|
||||
}
|
||||
|
||||
void PeerImp::doAccept()
|
||||
{
|
||||
assert(read_buffer_.size() == 0);
|
||||
@@ -574,7 +560,7 @@ PeerImp::onWriteResponse (error_code ec, std::size_t bytes_transferred)
|
||||
|
||||
write_buffer_.consume (bytes_transferred);
|
||||
if (write_buffer_.size() == 0)
|
||||
return doProtocolStart(false);
|
||||
return doProtocolStart();
|
||||
|
||||
setTimer();
|
||||
stream_.async_write_some (write_buffer_.data(),
|
||||
@@ -587,20 +573,9 @@ PeerImp::onWriteResponse (error_code ec, std::size_t bytes_transferred)
|
||||
|
||||
// Protocol logic
|
||||
|
||||
// We have an encrypted connection to the peer.
|
||||
// Have it say who it is so we know to avoid redundant connections.
|
||||
// Establish that it really who we are talking to by having it sign a
|
||||
// connection detail. Also need to establish no man in the middle attack
|
||||
// is in progress.
|
||||
void
|
||||
PeerImp::doProtocolStart(bool legacy)
|
||||
PeerImp::doProtocolStart()
|
||||
{
|
||||
if (legacy && !sendHello ())
|
||||
{
|
||||
journal_.error << "Unable to send HELLO to " << remote_address_;
|
||||
return fail ("hello");
|
||||
}
|
||||
|
||||
onReadMessage(error_code(), 0);
|
||||
}
|
||||
|
||||
@@ -710,28 +685,9 @@ PeerImp::error_code
|
||||
PeerImp::onMessageBegin (std::uint16_t type,
|
||||
std::shared_ptr <::google::protobuf::Message> const& m)
|
||||
{
|
||||
error_code ec;
|
||||
|
||||
if (type == protocol::mtHELLO && state_ != State::connected)
|
||||
{
|
||||
journal_.warning <<
|
||||
"Unexpected TMHello";
|
||||
ec = invalid_argument_error();
|
||||
}
|
||||
else if (type != protocol::mtHELLO && state_ == State::connected)
|
||||
{
|
||||
journal_.warning <<
|
||||
"Expected TMHello";
|
||||
ec = invalid_argument_error();
|
||||
}
|
||||
|
||||
if (! ec)
|
||||
{
|
||||
load_event_ = getApp().getJobQueue ().getLoadEventAP (
|
||||
jtPEER, protocolMessageName(type));
|
||||
}
|
||||
|
||||
return ec;
|
||||
return error_code{};
|
||||
}
|
||||
|
||||
void
|
||||
@@ -744,117 +700,7 @@ PeerImp::onMessageEnd (std::uint16_t,
|
||||
void
|
||||
PeerImp::onMessage (std::shared_ptr <protocol::TMHello> const& m)
|
||||
{
|
||||
std::uint32_t const ourTime (getApp().getOPs ().getNetworkTimeNC ());
|
||||
std::uint32_t const minTime (ourTime - clockToleranceDeltaSeconds);
|
||||
std::uint32_t const maxTime (ourTime + clockToleranceDeltaSeconds);
|
||||
|
||||
#ifdef BEAST_DEBUG
|
||||
if (m->has_nettime ())
|
||||
{
|
||||
std::int64_t to = ourTime;
|
||||
to -= m->nettime ();
|
||||
journal_.debug <<
|
||||
"Time offset: " << to;
|
||||
}
|
||||
#endif
|
||||
|
||||
// VFALCO TODO Report these failures in the HTTP response
|
||||
|
||||
auto const protocol = BuildInfo::make_protocol(m->protoversion());
|
||||
if (m->has_nettime () &&
|
||||
((m->nettime () < minTime) || (m->nettime () > maxTime)))
|
||||
{
|
||||
if (m->nettime () > maxTime)
|
||||
{
|
||||
journal_.info <<
|
||||
"Hello: Clock off by +" << m->nettime () - ourTime;
|
||||
}
|
||||
else if (m->nettime () < minTime)
|
||||
{
|
||||
journal_.info <<
|
||||
"Hello: Clock off by -" << ourTime - m->nettime ();
|
||||
}
|
||||
}
|
||||
else if (m->protoversionmin () > to_packed (BuildInfo::getCurrentProtocol()))
|
||||
{
|
||||
journal_.info <<
|
||||
"Hello: Disconnect: Protocol mismatch [" <<
|
||||
"Peer expects " << to_string (protocol) <<
|
||||
" and we run " << to_string (BuildInfo::getCurrentProtocol()) << "]";
|
||||
}
|
||||
else if (! publicKey_.setNodePublic (m->nodepublic ()))
|
||||
{
|
||||
journal_.info <<
|
||||
"Hello: Disconnect: Bad node public key.";
|
||||
}
|
||||
else if (! publicKey_.verifyNodePublic (
|
||||
sharedValue_, m->nodeproof (), ECDSA::not_strict))
|
||||
{
|
||||
// Unable to verify they have private key for claimed public key.
|
||||
journal_.info <<
|
||||
"Hello: Disconnect: Failed to verify session.";
|
||||
}
|
||||
else
|
||||
{
|
||||
if(journal_.info) journal_.info <<
|
||||
"Protocol: " << to_string(protocol);
|
||||
if(journal_.info) journal_.info <<
|
||||
"Public Key: " << publicKey_.humanNodePublic();
|
||||
bool const cluster = getApp().getUNL().nodeInCluster(publicKey_, name_);
|
||||
if (cluster)
|
||||
if (journal_.info) journal_.info <<
|
||||
"Cluster name: " << name_;
|
||||
|
||||
assert (state_ == State::connected);
|
||||
// VFALCO TODO Remove this needless state
|
||||
state_ = State::handshaked;
|
||||
hello_ = *m;
|
||||
|
||||
auto const result = overlay_.peerFinder().activate (slot_,
|
||||
publicKey_.toPublicKey(), cluster);
|
||||
|
||||
if (result == PeerFinder::Result::success)
|
||||
{
|
||||
state_ = State::active;
|
||||
overlay_.activate(shared_from_this ());
|
||||
|
||||
// XXX Set timer: connection is in grace period to be useful.
|
||||
// XXX Set timer: connection idle (idle may vary depending on connection type.)
|
||||
if ((hello_.has_ledgerclosed ()) && (
|
||||
hello_.ledgerclosed ().size () == (256 / 8)))
|
||||
{
|
||||
memcpy (closedLedgerHash_.begin (),
|
||||
hello_.ledgerclosed ().data (), 256 / 8);
|
||||
if ((hello_.has_ledgerprevious ()) &&
|
||||
(hello_.ledgerprevious ().size () == (256 / 8)))
|
||||
{
|
||||
memcpy (previousLedgerHash_.begin (),
|
||||
hello_.ledgerprevious ().data (), 256 / 8);
|
||||
addLedger (previousLedgerHash_);
|
||||
}
|
||||
else
|
||||
{
|
||||
previousLedgerHash_.zero();
|
||||
}
|
||||
}
|
||||
|
||||
return sendGetPeers();
|
||||
}
|
||||
|
||||
if (result == PeerFinder::Result::full)
|
||||
{
|
||||
// TODO Provide correct HTTP response
|
||||
auto const redirects = overlay_.peerFinder().redirect (slot_);
|
||||
sendEndpoints (redirects.begin(), redirects.end());
|
||||
return gracefulClose();
|
||||
}
|
||||
else if (result == PeerFinder::Result::duplicate)
|
||||
{
|
||||
return fail("Duplicate public key");
|
||||
}
|
||||
}
|
||||
|
||||
fail("TMHello invalid");
|
||||
fail("Deprecated TMHello");
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
@@ -156,13 +156,6 @@ public:
|
||||
std::unique_ptr<beast::asio::ssl_bundle>&& ssl_bundle,
|
||||
OverlayImpl& overlay);
|
||||
|
||||
/** Create an incoming legacy peer from an established ssl connection. */
|
||||
template <class ConstBufferSequence>
|
||||
PeerImp (id_t id, endpoint_type remote_endpoint,
|
||||
PeerFinder::Slot::ptr const& slot, ConstBufferSequence const& buffer,
|
||||
std::unique_ptr<beast::asio::ssl_bundle>&& ssl_bundle,
|
||||
OverlayImpl& overlay);
|
||||
|
||||
/** Create outgoing, handshaked peer. */
|
||||
// VFALCO legacyPublicKey should be implied by the Slot
|
||||
template <class Buffers>
|
||||
@@ -306,9 +299,6 @@ private:
|
||||
void
|
||||
doAccept();
|
||||
|
||||
void
|
||||
doLegacyAccept();
|
||||
|
||||
static
|
||||
beast::http::message
|
||||
makeResponse (bool crawl, beast::http::message const& req,
|
||||
@@ -323,7 +313,7 @@ private:
|
||||
|
||||
// Starts the protocol message loop
|
||||
void
|
||||
doProtocolStart (bool legacy);
|
||||
doProtocolStart ();
|
||||
|
||||
// Called when protocol message bytes are received
|
||||
void
|
||||
@@ -434,34 +424,6 @@ private:
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
template <class ConstBufferSequence>
|
||||
PeerImp::PeerImp (id_t id, endpoint_type remote_endpoint,
|
||||
PeerFinder::Slot::ptr const& slot, ConstBufferSequence const& buffers,
|
||||
std::unique_ptr<beast::asio::ssl_bundle>&& ssl_bundle,
|
||||
OverlayImpl& overlay)
|
||||
: Child (overlay)
|
||||
, id_ (id)
|
||||
, sink_ (deprecatedLogs().journal("Peer"), makePrefix(id))
|
||||
, p_sink_ (deprecatedLogs().journal("Protocol"), makePrefix(id))
|
||||
, journal_ (sink_)
|
||||
, p_journal_ (p_sink_)
|
||||
, ssl_bundle_ (std::move(ssl_bundle))
|
||||
, socket_ (ssl_bundle_->socket)
|
||||
, stream_ (ssl_bundle_->stream)
|
||||
, strand_ (socket_.get_io_service())
|
||||
, timer_ (socket_.get_io_service())
|
||||
, remote_address_ (
|
||||
beast::IPAddressConversion::from_asio(remote_endpoint))
|
||||
, overlay_ (overlay)
|
||||
, m_inbound (true)
|
||||
, state_ (State::connected)
|
||||
, slot_ (slot)
|
||||
, validatorsConnection_(getApp().getValidators().newConnection(id))
|
||||
{
|
||||
read_buffer_.commit(boost::asio::buffer_copy(read_buffer_.prepare(
|
||||
boost::asio::buffer_size(buffers)), buffers));
|
||||
}
|
||||
|
||||
template <class Buffers>
|
||||
PeerImp::PeerImp (std::unique_ptr<beast::asio::ssl_bundle>&& ssl_bundle,
|
||||
Buffers const& buffers, PeerFinder::Slot::ptr&& slot,
|
||||
|
||||
@@ -27,6 +27,7 @@
|
||||
#include <beast/crypto/base64.h>
|
||||
#include <beast/http/rfc2616.h>
|
||||
#include <beast/module/core/text/LexicalCast.h>
|
||||
#include <beast/utility/static_initializer.h>
|
||||
#include <boost/regex.hpp>
|
||||
#include <algorithm>
|
||||
|
||||
@@ -170,7 +171,7 @@ appendHello (beast::http::message& m,
|
||||
std::vector<ProtocolVersion>
|
||||
parse_ProtocolVersions (std::string const& s)
|
||||
{
|
||||
static boost::regex const re (
|
||||
static beast::static_initializer<boost::regex> re (
|
||||
"^" // start of line
|
||||
"RTXP/" // the string "RTXP/"
|
||||
"([1-9][0-9]*)" // a number (non-zero and with no leading zeroes)
|
||||
@@ -184,7 +185,7 @@ parse_ProtocolVersions (std::string const& s)
|
||||
for (auto const& s : list)
|
||||
{
|
||||
boost::smatch m;
|
||||
if (! boost::regex_match (s, m, re))
|
||||
if (! boost::regex_match (s, m, *re))
|
||||
continue;
|
||||
int major;
|
||||
int minor;
|
||||
|
||||
@@ -52,18 +52,6 @@ struct Handler
|
||||
onAccept (Session& session,
|
||||
boost::asio::ip::tcp::endpoint remote_address) = 0;
|
||||
|
||||
/** Called when a legacy peer protocol handshake is detected.
|
||||
If the called function does not take ownership, then the
|
||||
connection is closed.
|
||||
@param buffer The unconsumed bytes in the protocol handshake
|
||||
@param ssl_bundle The active connection.
|
||||
*/
|
||||
virtual
|
||||
void
|
||||
onLegacyPeerHello (std::unique_ptr<beast::asio::ssl_bundle>&& ssl_bundle,
|
||||
boost::asio::const_buffer buffer,
|
||||
boost::asio::ip::tcp::endpoint remote_address) = 0;
|
||||
|
||||
/** Called to process a complete HTTP request.
|
||||
The handler can do one of three things:
|
||||
- Ignore the request (return default constructed What)
|
||||
|
||||
@@ -63,47 +63,6 @@ private:
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
// Detects the legacy peer protocol handshake. */
|
||||
template <class Socket, class StreamBuf, class Yield>
|
||||
static
|
||||
std::pair <boost::system::error_code, bool>
|
||||
detect_peer_protocol (Socket& socket, StreamBuf& buf, Yield yield)
|
||||
{
|
||||
std::pair<boost::system::error_code, bool> result;
|
||||
result.second = false;
|
||||
for(;;)
|
||||
{
|
||||
std::size_t const max = 6; // max bytes needed
|
||||
unsigned char data[max];
|
||||
auto const n = boost::asio::buffer_copy(
|
||||
boost::asio::buffer(data), buf.data());
|
||||
|
||||
/* Protocol messages are framed by a 6 byte header which includes
|
||||
a big-endian 4-byte length followed by a big-endian 2-byte type.
|
||||
The type for 'hello' is 1.
|
||||
*/
|
||||
if (n>=1 && data[0] != 0)
|
||||
break;
|
||||
if (n>=2 && data[1] != 0)
|
||||
break;
|
||||
if (n>=5 && data[4] != 0)
|
||||
break;
|
||||
if (n>=6)
|
||||
{
|
||||
if (data[5] == 1)
|
||||
result.second = true;
|
||||
break;
|
||||
}
|
||||
std::size_t const bytes_transferred = boost::asio::async_read(
|
||||
socket, buf.prepare(max - n), boost::asio::transfer_at_least(1),
|
||||
yield[result.first]);
|
||||
if (result.first)
|
||||
break;
|
||||
buf.commit(bytes_transferred);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
template <class ConstBufferSequence>
|
||||
SSLPeer::SSLPeer (Door& door, beast::Journal journal,
|
||||
endpoint_type remote_address, ConstBufferSequence const& buffers,
|
||||
@@ -138,26 +97,10 @@ SSLPeer::do_handshake (yield_context yield)
|
||||
cancel_timer();
|
||||
if (ec)
|
||||
return fail (ec, "handshake");
|
||||
bool const legacy = port().protocol.count("peer") > 0;
|
||||
bool const http =
|
||||
port().protocol.count("peer") > 0 ||
|
||||
//|| port().protocol.count("wss") > 0
|
||||
port().protocol.count("https") > 0;
|
||||
if (legacy)
|
||||
{
|
||||
auto const result = detect_peer_protocol(stream_, read_buf_, yield);
|
||||
if (result.first)
|
||||
return fail (result.first, "detect_legacy_handshake");
|
||||
if (result.second)
|
||||
{
|
||||
std::vector<std::uint8_t> storage (read_buf_.size());
|
||||
boost::asio::mutable_buffers_1 buffer (
|
||||
boost::asio::mutable_buffer(storage.data(), storage.size()));
|
||||
boost::asio::buffer_copy(buffer, read_buf_.data());
|
||||
return door_.server().handler().onLegacyPeerHello(
|
||||
std::move(ssl_bundle_), buffer, remote_address_);
|
||||
}
|
||||
}
|
||||
if (http)
|
||||
{
|
||||
boost::asio::spawn (strand_, std::bind (&SSLPeer::do_read,
|
||||
|
||||
@@ -106,17 +106,6 @@ ServerHandlerImp::onAccept (HTTP::Session& session,
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
ServerHandlerImp::onLegacyPeerHello (
|
||||
std::unique_ptr<beast::asio::ssl_bundle>&& ssl_bundle,
|
||||
boost::asio::const_buffer buffer,
|
||||
boost::asio::ip::tcp::endpoint remote_address)
|
||||
{
|
||||
// VFALCO TODO Inject Overlay
|
||||
getApp().overlay().onLegacyPeerHello(std::move(ssl_bundle),
|
||||
buffer, remote_address);
|
||||
}
|
||||
|
||||
auto
|
||||
ServerHandlerImp::onHandoff (HTTP::Session& session,
|
||||
std::unique_ptr <beast::asio::ssl_bundle>&& bundle,
|
||||
|
||||
@@ -84,11 +84,6 @@ private:
|
||||
onAccept (HTTP::Session& session,
|
||||
boost::asio::ip::tcp::endpoint endpoint) override;
|
||||
|
||||
void
|
||||
onLegacyPeerHello (std::unique_ptr<beast::asio::ssl_bundle>&& ssl_bundle,
|
||||
boost::asio::const_buffer buffer,
|
||||
boost::asio::ip::tcp::endpoint remote_address) override;
|
||||
|
||||
Handoff
|
||||
onHandoff (HTTP::Session& session,
|
||||
std::unique_ptr <beast::asio::ssl_bundle>&& bundle,
|
||||
|
||||
@@ -102,13 +102,6 @@ public:
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
onLegacyPeerHello (std::unique_ptr<beast::asio::ssl_bundle>&& ssl_bundle,
|
||||
boost::asio::const_buffer buffer,
|
||||
boost::asio::ip::tcp::endpoint remote_address) override
|
||||
{
|
||||
}
|
||||
|
||||
Handoff
|
||||
onHandoff (Session& session,
|
||||
std::unique_ptr <beast::asio::ssl_bundle>&& bundle,
|
||||
|
||||
Reference in New Issue
Block a user