mirror of
https://github.com/XRPLF/rippled.git
synced 2026-06-06 18:26:51 +00:00
refactors p2p handshake out of PeerImp
This commit is contained in:
@@ -41,6 +41,9 @@ public:
|
||||
virtual void
|
||||
close() = 0;
|
||||
|
||||
virtual void
|
||||
cancel() = 0;
|
||||
|
||||
// Async I/O operations
|
||||
virtual void
|
||||
async_read_some(
|
||||
@@ -102,6 +105,12 @@ public:
|
||||
stream_->lowest_layer().close();
|
||||
}
|
||||
|
||||
void
|
||||
cancel() override
|
||||
{
|
||||
stream_->lowest_layer().cancel();
|
||||
}
|
||||
|
||||
void
|
||||
async_read_some(
|
||||
boost::beast::multi_buffer::mutable_buffers_type const& buffers,
|
||||
|
||||
232
src/xrpld/overlay/detail/InboundHandshake.cpp
Normal file
232
src/xrpld/overlay/detail/InboundHandshake.cpp
Normal file
@@ -0,0 +1,232 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of rippled: https://github.com/ripple/rippled
|
||||
Copyright (c) 2012, 2013 Ripple Labs Inc.
|
||||
|
||||
Permission to use, copy, modify, and/or distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include <xrpld/overlay/detail/Handshake.h>
|
||||
#include <xrpld/overlay/detail/InboundHandshake.h>
|
||||
#include <xrpld/overlay/detail/OverlayImpl.h>
|
||||
#include <xrpld/overlay/detail/PeerImp.h>
|
||||
|
||||
#include <boost/beast/core/ostream.hpp>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
InboundHandshake::InboundHandshake(
|
||||
Application& app,
|
||||
std::uint32_t id,
|
||||
std::shared_ptr<PeerFinder::Slot> const& slot,
|
||||
http_request_type&& request,
|
||||
PublicKey const& publicKey,
|
||||
ProtocolVersion protocolVersion,
|
||||
Resource::Consumer consumer,
|
||||
std::unique_ptr<StreamInterface>&& stream_ptr,
|
||||
PeerAttributes const& attributes,
|
||||
endpoint_type const& remoteEndpoint,
|
||||
OverlayImpl& overlay)
|
||||
: Child(overlay)
|
||||
, app_(app)
|
||||
, id_(id)
|
||||
, sink_(app_.logs()["Peer"], OverlayImpl::makePrefix(id))
|
||||
, journal_(sink_)
|
||||
, stream_ptr_(std::move(stream_ptr))
|
||||
, request_(std::move(request))
|
||||
, publicKey_(publicKey)
|
||||
, protocolVersion_(protocolVersion)
|
||||
, consumer_(consumer)
|
||||
, attributes_(attributes)
|
||||
, slot_(slot)
|
||||
, remoteEndpoint_(remoteEndpoint)
|
||||
, strand_(boost::asio::make_strand(stream_ptr_->get_executor()))
|
||||
{
|
||||
}
|
||||
|
||||
InboundHandshake::~InboundHandshake()
|
||||
{
|
||||
if (slot_ != nullptr)
|
||||
overlay_.peerFinder().on_closed(slot_);
|
||||
}
|
||||
|
||||
void
|
||||
InboundHandshake::stop()
|
||||
{
|
||||
if (!strand_.running_in_this_thread())
|
||||
return boost::asio::post(
|
||||
strand_, std::bind(&InboundHandshake::stop, shared_from_this()));
|
||||
|
||||
shutdown();
|
||||
}
|
||||
|
||||
void
|
||||
InboundHandshake::shutdown()
|
||||
{
|
||||
XRPL_ASSERT(
|
||||
strand_.running_in_this_thread(),
|
||||
"ripple::InboundHandshake::shutdown : strand in this thread");
|
||||
|
||||
if (!stream_ptr_->is_open() || shutdown_)
|
||||
return;
|
||||
|
||||
shutdown_ = true;
|
||||
|
||||
stream_ptr_->cancel();
|
||||
|
||||
tryAsyncShutdown();
|
||||
}
|
||||
|
||||
void
|
||||
InboundHandshake::tryAsyncShutdown()
|
||||
{
|
||||
XRPL_ASSERT(
|
||||
strand_.running_in_this_thread(),
|
||||
"ripple::InboundHandshake::tryAsyncShutdown : strand in this thread");
|
||||
|
||||
if (!stream_ptr_->is_open())
|
||||
return;
|
||||
|
||||
if (shutdown_ || shutdownStarted_)
|
||||
return;
|
||||
|
||||
if (ioPending_)
|
||||
return;
|
||||
|
||||
shutdownStarted_ = true;
|
||||
|
||||
return stream_ptr_->async_shutdown(boost::asio::bind_executor(
|
||||
strand_,
|
||||
std::bind(
|
||||
&InboundHandshake::onShutdown,
|
||||
shared_from_this(),
|
||||
std::placeholders::_1)));
|
||||
}
|
||||
|
||||
void
|
||||
InboundHandshake::onShutdown(error_code ec)
|
||||
{
|
||||
XRPL_ASSERT(
|
||||
strand_.running_in_this_thread(),
|
||||
"ripple::InboundHandshake::onShutdown : strand in this thread");
|
||||
|
||||
if (!stream_ptr_->is_open())
|
||||
return;
|
||||
|
||||
if (ec == boost::asio::error::operation_aborted)
|
||||
return;
|
||||
|
||||
if (ec)
|
||||
{
|
||||
JLOG(journal_.warn()) << "onShutdown: " << ec.message();
|
||||
}
|
||||
|
||||
stream_ptr_->close();
|
||||
}
|
||||
|
||||
void
|
||||
InboundHandshake::run()
|
||||
{
|
||||
if (!strand_.running_in_this_thread())
|
||||
return boost::asio::post(
|
||||
strand_, std::bind(&InboundHandshake::run, shared_from_this()));
|
||||
|
||||
// TODO: implement fail overload to handle strings
|
||||
auto const sharedValue = stream_ptr_->makeSharedValue(journal_);
|
||||
if (!sharedValue)
|
||||
return fail("makeSharedValue", boost::system::error_code{});
|
||||
|
||||
// Create the handshake response
|
||||
auto const response = makeResponse(
|
||||
!overlay_.peerFinder().config().peerPrivate,
|
||||
request_,
|
||||
overlay_.setup().public_ip,
|
||||
remoteEndpoint_.address(),
|
||||
*sharedValue,
|
||||
overlay_.setup().networkID,
|
||||
protocolVersion_,
|
||||
app_);
|
||||
|
||||
// Convert response to buffer for async_write
|
||||
auto write_buffer = std::make_shared<boost::beast::multi_buffer>();
|
||||
boost::beast::ostream(*write_buffer) << response;
|
||||
|
||||
ioPending_ = true;
|
||||
// Write the response asynchronously
|
||||
stream_ptr_->async_write(
|
||||
write_buffer->data(),
|
||||
boost::asio::bind_executor(
|
||||
strand_,
|
||||
[this, write_buffer, self = shared_from_this()](
|
||||
error_code ec, std::size_t bytes_transferred) {
|
||||
onHandshake(ec, bytes_transferred);
|
||||
}));
|
||||
}
|
||||
|
||||
void
|
||||
InboundHandshake::onHandshake(error_code ec, std::size_t bytes_transferred)
|
||||
{
|
||||
ioPending_ = false;
|
||||
if (!stream_ptr_->is_open())
|
||||
return;
|
||||
|
||||
if (ec == boost::asio::error::operation_aborted || shutdown_)
|
||||
return tryAsyncShutdown();
|
||||
|
||||
if (ec)
|
||||
return fail("onHandshake", ec);
|
||||
|
||||
JLOG(journal_.debug()) << "InboundHandshake completed for "
|
||||
<< remoteEndpoint_
|
||||
<< ", bytes transferred: " << bytes_transferred;
|
||||
|
||||
// Handshake successful, create the peer
|
||||
createPeer();
|
||||
}
|
||||
|
||||
void
|
||||
InboundHandshake::createPeer()
|
||||
{
|
||||
auto const peer = std::make_shared<PeerImp>(
|
||||
app_,
|
||||
overlay_,
|
||||
std::move(slot_),
|
||||
std::move(stream_ptr_),
|
||||
consumer_,
|
||||
protocolVersion_,
|
||||
attributes_,
|
||||
publicKey_,
|
||||
id_);
|
||||
|
||||
// Add the peer to the overlay
|
||||
overlay_.add_active(peer);
|
||||
JLOG(journal_.debug()) << "Created peer for " << remoteEndpoint_;
|
||||
}
|
||||
|
||||
void
|
||||
InboundHandshake::fail(std::string const& name, error_code ec)
|
||||
{
|
||||
XRPL_ASSERT(
|
||||
strand_.running_in_this_thread(),
|
||||
"ripple::InboundHandshake::fail : strand in this thread");
|
||||
|
||||
JLOG(journal_.warn()) << name << " from "
|
||||
<< toBase58(TokenType::NodePublic, publicKey_)
|
||||
<< " at " << remoteEndpoint_.address().to_string()
|
||||
<< ": " << ec.message();
|
||||
|
||||
shutdown();
|
||||
}
|
||||
|
||||
} // namespace ripple
|
||||
109
src/xrpld/overlay/detail/InboundHandshake.h
Normal file
109
src/xrpld/overlay/detail/InboundHandshake.h
Normal file
@@ -0,0 +1,109 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of rippled: https://github.com/ripple/rippled
|
||||
Copyright (c) 2012, 2013 Ripple Labs Inc.
|
||||
|
||||
Permission to use, copy, modify, and/or distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#ifndef RIPPLE_OVERLAY_INBOUNDHANDSHAKE_H_INCLUDED
|
||||
#define RIPPLE_OVERLAY_INBOUNDHANDSHAKE_H_INCLUDED
|
||||
|
||||
#include <xrpld/overlay/detail/Handshake.h>
|
||||
#include <xrpld/overlay/detail/OverlayImpl.h>
|
||||
#include <xrpld/overlay/detail/PeerImp.h>
|
||||
|
||||
#include <xrpl/server/Handoff.h>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
/** Manages an inbound peer handshake. */
|
||||
class InboundHandshake : public OverlayImpl::Child,
|
||||
public std::enable_shared_from_this<InboundHandshake>
|
||||
{
|
||||
using error_code = boost::system::error_code;
|
||||
using endpoint_type = boost::asio::ip::tcp::endpoint;
|
||||
|
||||
private:
|
||||
Application& app_;
|
||||
std::uint32_t const id_;
|
||||
beast::WrappedSink sink_;
|
||||
beast::Journal const journal_;
|
||||
std::unique_ptr<StreamInterface> stream_ptr_;
|
||||
http_request_type request_;
|
||||
PublicKey publicKey_;
|
||||
ProtocolVersion protocolVersion_;
|
||||
Resource::Consumer consumer_;
|
||||
PeerAttributes attributes_;
|
||||
std::shared_ptr<PeerFinder::Slot> slot_;
|
||||
endpoint_type remoteEndpoint_;
|
||||
boost::asio::strand<boost::asio::executor> strand_;
|
||||
bool shutdown_ = false;
|
||||
bool ioPending_ = false;
|
||||
bool shutdownStarted_ = false;
|
||||
|
||||
public:
|
||||
InboundHandshake(
|
||||
Application& app,
|
||||
std::uint32_t id,
|
||||
std::shared_ptr<PeerFinder::Slot> const& slot,
|
||||
http_request_type&& request,
|
||||
PublicKey const& public_key,
|
||||
ProtocolVersion protocol_version,
|
||||
Resource::Consumer consumer,
|
||||
std::unique_ptr<StreamInterface>&& stream_ptr,
|
||||
PeerAttributes const& attributes,
|
||||
endpoint_type const& remote_endpoint,
|
||||
OverlayImpl& overlay);
|
||||
|
||||
~InboundHandshake();
|
||||
|
||||
void
|
||||
stop() override;
|
||||
|
||||
void
|
||||
run();
|
||||
|
||||
private:
|
||||
void
|
||||
setTimer();
|
||||
|
||||
void
|
||||
onTimer(error_code ec);
|
||||
|
||||
void
|
||||
cancelTimer();
|
||||
|
||||
void
|
||||
shutdown();
|
||||
|
||||
void
|
||||
tryAsyncShutdown();
|
||||
|
||||
void
|
||||
onShutdown(error_code ec);
|
||||
|
||||
void
|
||||
onHandshake(error_code ec, std::size_t bytes_transferred);
|
||||
|
||||
void
|
||||
createPeer();
|
||||
|
||||
void
|
||||
fail(std::string const& name, error_code ec);
|
||||
};
|
||||
|
||||
} // namespace ripple
|
||||
|
||||
#endif
|
||||
@@ -25,6 +25,7 @@
|
||||
#include <xrpld/app/rdb/Wallet.h>
|
||||
#include <xrpld/overlay/Cluster.h>
|
||||
#include <xrpld/overlay/detail/ConnectAttempt.h>
|
||||
#include <xrpld/overlay/detail/InboundHandshake.h>
|
||||
#include <xrpld/overlay/detail/PeerImp.h>
|
||||
#include <xrpld/overlay/detail/TrafficCount.h>
|
||||
#include <xrpld/overlay/detail/Tuning.h>
|
||||
@@ -173,9 +174,7 @@ OverlayImpl::onHandoff(
|
||||
beast::Journal journal(sink);
|
||||
|
||||
Handoff handoff;
|
||||
if (processRequest(request, handoff))
|
||||
return handoff;
|
||||
if (!isPeerUpgrade(request))
|
||||
if (processRequest(request, handoff) || !isPeerUpgrade(request))
|
||||
return handoff;
|
||||
|
||||
handoff.moved = true;
|
||||
@@ -196,42 +195,11 @@ OverlayImpl::onHandoff(
|
||||
if (consumer.disconnect(journal))
|
||||
return handoff;
|
||||
|
||||
auto const [slot, result] = m_peerFinder->new_inbound_slot(
|
||||
beast::IPAddressConversion::from_asio(local_endpoint),
|
||||
beast::IPAddressConversion::from_asio(remote_endpoint));
|
||||
|
||||
if (slot == nullptr)
|
||||
{
|
||||
// connection refused either IP limit exceeded or self-connect
|
||||
handoff.moved = false;
|
||||
JLOG(journal.debug())
|
||||
<< "Peer " << remote_endpoint << " refused, " << to_string(result);
|
||||
return handoff;
|
||||
}
|
||||
|
||||
// Validate HTTP request
|
||||
|
||||
{
|
||||
auto const types = beast::rfc2616::split_commas(request["Connect-As"]);
|
||||
if (std::find_if(types.begin(), types.end(), [](std::string const& s) {
|
||||
return boost::iequals(s, "peer");
|
||||
}) == types.end())
|
||||
{
|
||||
handoff.moved = false;
|
||||
handoff.response =
|
||||
makeRedirectResponse(slot, request, remote_endpoint.address());
|
||||
handoff.keep_alive = beast::rfc2616::is_keep_alive(request);
|
||||
return handoff;
|
||||
}
|
||||
}
|
||||
|
||||
auto const negotiatedVersion = negotiateProtocolVersion(request["Upgrade"]);
|
||||
if (!negotiatedVersion)
|
||||
{
|
||||
m_peerFinder->on_closed(slot);
|
||||
handoff.moved = false;
|
||||
handoff.response = makeErrorResponse(
|
||||
slot,
|
||||
request,
|
||||
remote_endpoint.address(),
|
||||
"Unable to agree on a protocol version");
|
||||
@@ -244,17 +212,42 @@ OverlayImpl::onHandoff(
|
||||
auto const sharedValue = stream_ptr->makeSharedValue(journal);
|
||||
if (!sharedValue)
|
||||
{
|
||||
m_peerFinder->on_closed(slot);
|
||||
handoff.moved = false;
|
||||
handoff.response = makeErrorResponse(
|
||||
slot,
|
||||
request,
|
||||
remote_endpoint.address(),
|
||||
"Incorrect security cookie");
|
||||
request, remote_endpoint.address(), "Incorrect security cookie");
|
||||
handoff.keep_alive = false;
|
||||
return handoff;
|
||||
}
|
||||
|
||||
// Validate HTTP request
|
||||
|
||||
{
|
||||
auto const types = beast::rfc2616::split_commas(request["Connect-As"]);
|
||||
if (std::find_if(types.begin(), types.end(), [](std::string const& s) {
|
||||
return boost::iequals(s, "peer");
|
||||
}) == types.end())
|
||||
{
|
||||
handoff.moved = false;
|
||||
handoff.response = makeErrorResponse(
|
||||
request, remote_endpoint.address(), "Invalid Peer Type");
|
||||
handoff.keep_alive = beast::rfc2616::is_keep_alive(request);
|
||||
return handoff;
|
||||
}
|
||||
}
|
||||
|
||||
auto const [slot, result] = m_peerFinder->new_inbound_slot(
|
||||
beast::IPAddressConversion::from_asio(local_endpoint),
|
||||
beast::IPAddressConversion::from_asio(remote_endpoint));
|
||||
|
||||
if (slot == nullptr)
|
||||
{
|
||||
// connection refused either IP limit exceeded or self-connect
|
||||
handoff.moved = false;
|
||||
JLOG(journal.debug())
|
||||
<< "Peer " << remote_endpoint << " refused, " << to_string(result);
|
||||
return handoff;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
auto publicKey = verifyHandshake(
|
||||
@@ -269,10 +262,12 @@ OverlayImpl::onHandoff(
|
||||
// The node gets a reserved slot if it is in our cluster
|
||||
// or if it has a reservation.
|
||||
bool const reserved =
|
||||
static_cast<bool>(app_.cluster().member(publicKey)) ||
|
||||
app_.cluster().member(publicKey).has_value() ||
|
||||
app_.peerReservations().contains(publicKey);
|
||||
|
||||
auto const result =
|
||||
m_peerFinder->activate(slot, publicKey, reserved);
|
||||
|
||||
if (result != PeerFinder::Result::success)
|
||||
{
|
||||
m_peerFinder->on_closed(slot);
|
||||
@@ -290,7 +285,7 @@ OverlayImpl::onHandoff(
|
||||
auto const attributes =
|
||||
extractPeerAttributes(request, app_.config(), true);
|
||||
|
||||
auto const peer = std::make_shared<PeerImp>(
|
||||
auto const p = std::make_shared<InboundHandshake>(
|
||||
app_,
|
||||
id,
|
||||
slot,
|
||||
@@ -300,23 +295,13 @@ OverlayImpl::onHandoff(
|
||||
consumer,
|
||||
std::move(stream_ptr),
|
||||
attributes,
|
||||
remote_endpoint,
|
||||
*this);
|
||||
{
|
||||
// As we are not on the strand, run() must be called
|
||||
// while holding the lock, otherwise new I/O can be
|
||||
// queued after a call to stop().
|
||||
std::lock_guard<decltype(mutex_)> lock(mutex_);
|
||||
{
|
||||
auto const result = m_peers.emplace(peer->slot(), peer);
|
||||
XRPL_ASSERT(
|
||||
result.second,
|
||||
"ripple::OverlayImpl::onHandoff : peer is inserted");
|
||||
(void)result.second;
|
||||
}
|
||||
list_.emplace(peer.get(), peer);
|
||||
|
||||
peer->run();
|
||||
}
|
||||
std::lock_guard lock(mutex_);
|
||||
list_.emplace(p.get(), p);
|
||||
p->run();
|
||||
|
||||
handoff.moved = true;
|
||||
return handoff;
|
||||
}
|
||||
@@ -327,8 +312,8 @@ OverlayImpl::onHandoff(
|
||||
|
||||
m_peerFinder->on_closed(slot);
|
||||
handoff.moved = false;
|
||||
handoff.response = makeErrorResponse(
|
||||
slot, request, remote_endpoint.address(), e.what());
|
||||
handoff.response =
|
||||
makeErrorResponse(request, remote_endpoint.address(), e.what());
|
||||
handoff.keep_alive = false;
|
||||
return handoff;
|
||||
}
|
||||
@@ -382,7 +367,6 @@ OverlayImpl::makeRedirectResponse(
|
||||
|
||||
std::shared_ptr<Writer>
|
||||
OverlayImpl::makeErrorResponse(
|
||||
std::shared_ptr<PeerFinder::Slot> const& slot,
|
||||
http_request_type const& request,
|
||||
address_type remote_address,
|
||||
std::string text)
|
||||
|
||||
@@ -465,7 +465,6 @@ private:
|
||||
|
||||
std::shared_ptr<Writer>
|
||||
makeErrorResponse(
|
||||
std::shared_ptr<PeerFinder::Slot> const& slot,
|
||||
http_request_type const& request,
|
||||
address_type remote_address,
|
||||
std::string msg);
|
||||
|
||||
@@ -150,15 +150,14 @@ extractPeerAttributes(
|
||||
|
||||
PeerImp::PeerImp(
|
||||
Application& app,
|
||||
id_t id,
|
||||
OverlayImpl& overlay,
|
||||
std::shared_ptr<PeerFinder::Slot> const& slot,
|
||||
http_request_type&& request,
|
||||
PublicKey const& publicKey,
|
||||
ProtocolVersion protocol,
|
||||
Resource::Consumer consumer,
|
||||
std::unique_ptr<StreamInterface>&& stream_ptr,
|
||||
Resource::Consumer consumer,
|
||||
ProtocolVersion protocol,
|
||||
PeerAttributes const& attributes,
|
||||
OverlayImpl& overlay)
|
||||
PublicKey const& publicKey,
|
||||
id_t id)
|
||||
: Child(overlay)
|
||||
, app_(app)
|
||||
, id_(id)
|
||||
@@ -182,7 +181,6 @@ PeerImp::PeerImp(
|
||||
, usage_(consumer)
|
||||
, fee_{Resource::feeTrivialPeer, ""}
|
||||
, slot_(slot)
|
||||
, request_(std::move(request))
|
||||
, attributes_(attributes)
|
||||
, ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
|
||||
{
|
||||
@@ -831,15 +829,6 @@ PeerImp::doAccept()
|
||||
read_buffer_.size() == 0,
|
||||
"ripple::PeerImp::doAccept : empty read buffer");
|
||||
|
||||
JLOG(journal_.debug()) << "doAccept: " << remote_address_;
|
||||
|
||||
auto const sharedValue = stream_ptr_->makeSharedValue(journal_);
|
||||
|
||||
// This shouldn't fail since we already computed
|
||||
// the shared value successfully in OverlayImpl
|
||||
if (!sharedValue)
|
||||
return fail("makeSharedValue: Unexpected failure");
|
||||
|
||||
JLOG(journal_.info()) << "Protocol: " << to_string(protocol_);
|
||||
JLOG(journal_.info()) << "Public Key: "
|
||||
<< toBase58(TokenType::NodePublic, publicKey_);
|
||||
@@ -853,43 +842,7 @@ PeerImp::doAccept()
|
||||
JLOG(journal_.info()) << "Cluster name: " << *member;
|
||||
}
|
||||
|
||||
overlay_.activate(shared_from_this());
|
||||
|
||||
// XXX Set timer: connection is in grace period to be useful.
|
||||
// XXX Set timer: connection idle (idle may vary depending on connection
|
||||
// type.)
|
||||
|
||||
auto write_buffer = std::make_shared<boost::beast::multi_buffer>();
|
||||
|
||||
boost::beast::ostream(*write_buffer) << makeResponse(
|
||||
!overlay_.peerFinder().config().peerPrivate,
|
||||
request_,
|
||||
overlay_.setup().public_ip,
|
||||
remote_address_.address(),
|
||||
*sharedValue,
|
||||
overlay_.setup().networkID,
|
||||
protocol_,
|
||||
app_);
|
||||
|
||||
// Write the whole buffer and only start protocol when that's done.
|
||||
stream_ptr_->async_write(
|
||||
write_buffer->data(),
|
||||
[this, write_buffer, self = shared_from_this()](
|
||||
error_code ec, std::size_t bytes_transferred) {
|
||||
// Post completion to the strand to ensure thread safety
|
||||
boost::asio::post(
|
||||
strand_, [this, write_buffer, self, ec, bytes_transferred]() {
|
||||
if (!socketOpen())
|
||||
return;
|
||||
if (ec == boost::asio::error::operation_aborted)
|
||||
return;
|
||||
if (ec)
|
||||
return fail("onWriteResponse", ec);
|
||||
if (write_buffer->size() == bytes_transferred)
|
||||
return doProtocolStart();
|
||||
return fail("Failed to write header");
|
||||
});
|
||||
});
|
||||
doProtocolStart();
|
||||
}
|
||||
|
||||
std::string
|
||||
|
||||
@@ -100,53 +100,6 @@ private:
|
||||
boost::asio::basic_waitable_timer<std::chrono::steady_clock>;
|
||||
using Compressed = compression::Compressed;
|
||||
|
||||
Application& app_;
|
||||
id_t const id_;
|
||||
beast::WrappedSink sink_;
|
||||
beast::WrappedSink p_sink_;
|
||||
beast::Journal const journal_;
|
||||
beast::Journal const p_journal_;
|
||||
std::unique_ptr<StreamInterface> stream_ptr_;
|
||||
boost::asio::strand<boost::asio::executor> strand_;
|
||||
waitable_timer timer_;
|
||||
|
||||
// Updated at each stage of the connection process to reflect
|
||||
// the current conditions as closely as possible.
|
||||
beast::IP::Endpoint const remote_address_;
|
||||
|
||||
// These are up here to prevent warnings about order of initializations
|
||||
//
|
||||
OverlayImpl& overlay_;
|
||||
bool const inbound_;
|
||||
|
||||
// Protocol version to use for this link
|
||||
ProtocolVersion protocol_;
|
||||
|
||||
std::atomic<Tracking> tracking_;
|
||||
clock_type::time_point trackingTime_;
|
||||
bool detaching_ = false;
|
||||
// Node public key of peer.
|
||||
PublicKey const publicKey_;
|
||||
std::string name_;
|
||||
std::shared_mutex mutable nameMutex_;
|
||||
|
||||
// The indices of the smallest and largest ledgers this peer has available
|
||||
//
|
||||
LedgerIndex minLedger_ = 0;
|
||||
LedgerIndex maxLedger_ = 0;
|
||||
uint256 closedLedgerHash_;
|
||||
uint256 previousLedgerHash_;
|
||||
|
||||
boost::circular_buffer<uint256> recentLedgers_{128};
|
||||
boost::circular_buffer<uint256> recentTxSets_{128};
|
||||
|
||||
std::optional<std::chrono::milliseconds> latency_;
|
||||
std::optional<std::uint32_t> lastPingSeq_;
|
||||
clock_type::time_point lastPingTime_;
|
||||
clock_type::time_point const creationTime_;
|
||||
|
||||
reduce_relay::Squelch<UptimeClock> squelch_;
|
||||
|
||||
// Notes on thread locking:
|
||||
//
|
||||
// During an audit it was noted that some member variables that looked
|
||||
@@ -194,31 +147,6 @@ private:
|
||||
}
|
||||
};
|
||||
|
||||
std::mutex mutable recentLock_;
|
||||
protocol::TMStatusChange last_status_;
|
||||
Resource::Consumer usage_;
|
||||
ChargeWithContext fee_;
|
||||
std::shared_ptr<PeerFinder::Slot> const slot_;
|
||||
boost::beast::multi_buffer read_buffer_;
|
||||
http_request_type request_;
|
||||
PeerAttributes const attributes_;
|
||||
std::queue<std::shared_ptr<Message>> send_queue_;
|
||||
bool gracefulClose_ = false;
|
||||
int large_sendq_ = 0;
|
||||
std::unique_ptr<LoadEvent> load_event_;
|
||||
// The highest sequence of each PublisherList that has
|
||||
// been sent to or received from this peer.
|
||||
hash_map<PublicKey, std::size_t> publisherListSequences_;
|
||||
|
||||
// Queue of transactions' hashes that have not been
|
||||
// relayed. The hashes are sent once a second to a peer
|
||||
// and the peer requests missing transactions from the node.
|
||||
hash_set<uint256> txQueue_;
|
||||
|
||||
LedgerReplayMsgHandler ledgerReplayMsgHandler_;
|
||||
|
||||
friend class OverlayImpl;
|
||||
|
||||
class Metrics
|
||||
{
|
||||
public:
|
||||
@@ -252,6 +180,79 @@ private:
|
||||
Metrics recv;
|
||||
} metrics_;
|
||||
|
||||
Application& app_;
|
||||
id_t const id_;
|
||||
|
||||
beast::WrappedSink sink_;
|
||||
beast::WrappedSink p_sink_;
|
||||
beast::Journal const journal_;
|
||||
beast::Journal const p_journal_;
|
||||
|
||||
std::unique_ptr<StreamInterface> stream_ptr_;
|
||||
boost::asio::strand<boost::asio::executor> strand_;
|
||||
waitable_timer timer_;
|
||||
|
||||
// Updated at each stage of the connection process to reflect
|
||||
// the current conditions as closely as possible.
|
||||
beast::IP::Endpoint const remote_address_;
|
||||
|
||||
// These are up here to prevent warnings about order of initializations
|
||||
//
|
||||
OverlayImpl& overlay_;
|
||||
bool const inbound_;
|
||||
|
||||
// Protocol version to use for this link
|
||||
ProtocolVersion protocol_;
|
||||
|
||||
std::atomic<Tracking> tracking_;
|
||||
clock_type::time_point trackingTime_;
|
||||
bool detaching_ = false;
|
||||
// Node public key of peer.
|
||||
PublicKey const publicKey_;
|
||||
std::string name_;
|
||||
std::shared_mutex mutable nameMutex_;
|
||||
|
||||
// The indices of the smallest and largest ledgers this peer has available
|
||||
//
|
||||
LedgerIndex minLedger_ = 0;
|
||||
LedgerIndex maxLedger_ = 0;
|
||||
uint256 closedLedgerHash_;
|
||||
uint256 previousLedgerHash_;
|
||||
|
||||
boost::circular_buffer<uint256> recentLedgers_{128};
|
||||
boost::circular_buffer<uint256> recentTxSets_{128};
|
||||
|
||||
std::optional<std::chrono::milliseconds> latency_;
|
||||
std::optional<std::uint32_t> lastPingSeq_;
|
||||
clock_type::time_point lastPingTime_;
|
||||
clock_type::time_point const creationTime_;
|
||||
|
||||
reduce_relay::Squelch<UptimeClock> squelch_;
|
||||
|
||||
std::mutex mutable recentLock_;
|
||||
protocol::TMStatusChange last_status_;
|
||||
Resource::Consumer usage_;
|
||||
ChargeWithContext fee_;
|
||||
std::shared_ptr<PeerFinder::Slot> const slot_;
|
||||
boost::beast::multi_buffer read_buffer_;
|
||||
PeerAttributes const attributes_;
|
||||
std::queue<std::shared_ptr<Message>> send_queue_;
|
||||
bool gracefulClose_ = false;
|
||||
int large_sendq_ = 0;
|
||||
std::unique_ptr<LoadEvent> load_event_;
|
||||
// The highest sequence of each PublisherList that has
|
||||
// been sent to or received from this peer.
|
||||
hash_map<PublicKey, std::size_t> publisherListSequences_;
|
||||
|
||||
// Queue of transactions' hashes that have not been
|
||||
// relayed. The hashes are sent once a second to a peer
|
||||
// and the peer requests missing transactions from the node.
|
||||
hash_set<uint256> txQueue_;
|
||||
|
||||
LedgerReplayMsgHandler ledgerReplayMsgHandler_;
|
||||
|
||||
friend class OverlayImpl;
|
||||
|
||||
public:
|
||||
PeerImp(PeerImp const&) = delete;
|
||||
PeerImp&
|
||||
@@ -260,25 +261,23 @@ public:
|
||||
/** Create an active incoming peer from an established ssl connection. */
|
||||
PeerImp(
|
||||
Application& app,
|
||||
id_t id,
|
||||
OverlayImpl& overlay,
|
||||
std::shared_ptr<PeerFinder::Slot> const& slot,
|
||||
http_request_type&& request,
|
||||
PublicKey const& publicKey,
|
||||
ProtocolVersion protocol,
|
||||
Resource::Consumer consumer,
|
||||
std::unique_ptr<StreamInterface>&& stream_ptr,
|
||||
Resource::Consumer consumer,
|
||||
ProtocolVersion protocol,
|
||||
PeerAttributes const& attributes,
|
||||
OverlayImpl& overlay);
|
||||
PublicKey const& publicKey,
|
||||
id_t id);
|
||||
|
||||
/** Create outgoing, handshaked peer. */
|
||||
// VFALCO legacyPublicKey should be implied by the Slot
|
||||
template <class Buffers>
|
||||
PeerImp(
|
||||
Application& app,
|
||||
std::unique_ptr<StreamInterface>&& stream_ptr,
|
||||
Buffers const& buffers,
|
||||
std::shared_ptr<PeerFinder::Slot>&& slot,
|
||||
Resource::Consumer usage,
|
||||
Resource::Consumer consumer,
|
||||
PublicKey const& publicKey,
|
||||
ProtocolVersion protocol,
|
||||
id_t id,
|
||||
|
||||
Reference in New Issue
Block a user