From 2dc49e30c4ea00282423b3b13d77c4c21d2c957f Mon Sep 17 00:00:00 2001 From: Vito <5780819+Tapanito@users.noreply.github.com> Date: Thu, 4 Sep 2025 19:43:50 +0200 Subject: [PATCH] refactors p2p handshake out of PeerImp --- include/xrpl/server/detail/StreamInterface.h | 9 + src/xrpld/overlay/detail/InboundHandshake.cpp | 232 ++++++++++++++++++ src/xrpld/overlay/detail/InboundHandshake.h | 109 ++++++++ src/xrpld/overlay/detail/OverlayImpl.cpp | 102 ++++---- src/xrpld/overlay/detail/OverlayImpl.h | 1 - src/xrpld/overlay/detail/PeerImp.cpp | 59 +---- src/xrpld/overlay/detail/PeerImp.h | 159 ++++++------ 7 files changed, 478 insertions(+), 193 deletions(-) create mode 100644 src/xrpld/overlay/detail/InboundHandshake.cpp create mode 100644 src/xrpld/overlay/detail/InboundHandshake.h diff --git a/include/xrpl/server/detail/StreamInterface.h b/include/xrpl/server/detail/StreamInterface.h index 23a73afdb4..d8dc43bb1a 100644 --- a/include/xrpl/server/detail/StreamInterface.h +++ b/include/xrpl/server/detail/StreamInterface.h @@ -41,6 +41,9 @@ public: virtual void close() = 0; + virtual void + cancel() = 0; + // Async I/O operations virtual void async_read_some( @@ -102,6 +105,12 @@ public: stream_->lowest_layer().close(); } + void + cancel() override + { + stream_->lowest_layer().cancel(); + } + void async_read_some( boost::beast::multi_buffer::mutable_buffers_type const& buffers, diff --git a/src/xrpld/overlay/detail/InboundHandshake.cpp b/src/xrpld/overlay/detail/InboundHandshake.cpp new file mode 100644 index 0000000000..30721b2230 --- /dev/null +++ b/src/xrpld/overlay/detail/InboundHandshake.cpp @@ -0,0 +1,232 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include +#include +#include + +#include + +namespace ripple { + +InboundHandshake::InboundHandshake( + Application& app, + std::uint32_t id, + std::shared_ptr const& slot, + http_request_type&& request, + PublicKey const& publicKey, + ProtocolVersion protocolVersion, + Resource::Consumer consumer, + std::unique_ptr&& stream_ptr, + PeerAttributes const& attributes, + endpoint_type const& remoteEndpoint, + OverlayImpl& overlay) + : Child(overlay) + , app_(app) + , id_(id) + , sink_(app_.logs()["Peer"], OverlayImpl::makePrefix(id)) + , journal_(sink_) + , stream_ptr_(std::move(stream_ptr)) + , request_(std::move(request)) + , publicKey_(publicKey) + , protocolVersion_(protocolVersion) + , consumer_(consumer) + , attributes_(attributes) + , slot_(slot) + , remoteEndpoint_(remoteEndpoint) + , strand_(boost::asio::make_strand(stream_ptr_->get_executor())) +{ +} + +InboundHandshake::~InboundHandshake() +{ + if (slot_ != nullptr) + overlay_.peerFinder().on_closed(slot_); +} + +void +InboundHandshake::stop() +{ + if (!strand_.running_in_this_thread()) + return boost::asio::post( + strand_, std::bind(&InboundHandshake::stop, shared_from_this())); + + shutdown(); +} + +void +InboundHandshake::shutdown() +{ + XRPL_ASSERT( + strand_.running_in_this_thread(), + "ripple::InboundHandshake::shutdown : strand in this thread"); + + if (!stream_ptr_->is_open() || shutdown_) + return; + + shutdown_ = true; + + stream_ptr_->cancel(); + + tryAsyncShutdown(); +} + +void +InboundHandshake::tryAsyncShutdown() +{ + XRPL_ASSERT( + strand_.running_in_this_thread(), + "ripple::InboundHandshake::tryAsyncShutdown : strand in this thread"); + + if (!stream_ptr_->is_open()) + return; + + if (shutdown_ || shutdownStarted_) + return; + + if (ioPending_) + return; + + shutdownStarted_ = true; + + return stream_ptr_->async_shutdown(boost::asio::bind_executor( + strand_, + std::bind( + &InboundHandshake::onShutdown, + shared_from_this(), + std::placeholders::_1))); +} + +void +InboundHandshake::onShutdown(error_code ec) +{ + XRPL_ASSERT( + strand_.running_in_this_thread(), + "ripple::InboundHandshake::onShutdown : strand in this thread"); + + if (!stream_ptr_->is_open()) + return; + + if (ec == boost::asio::error::operation_aborted) + return; + + if (ec) + { + JLOG(journal_.warn()) << "onShutdown: " << ec.message(); + } + + stream_ptr_->close(); +} + +void +InboundHandshake::run() +{ + if (!strand_.running_in_this_thread()) + return boost::asio::post( + strand_, std::bind(&InboundHandshake::run, shared_from_this())); + + // TODO: implement fail overload to handle strings + auto const sharedValue = stream_ptr_->makeSharedValue(journal_); + if (!sharedValue) + return fail("makeSharedValue", boost::system::error_code{}); + + // Create the handshake response + auto const response = makeResponse( + !overlay_.peerFinder().config().peerPrivate, + request_, + overlay_.setup().public_ip, + remoteEndpoint_.address(), + *sharedValue, + overlay_.setup().networkID, + protocolVersion_, + app_); + + // Convert response to buffer for async_write + auto write_buffer = std::make_shared(); + boost::beast::ostream(*write_buffer) << response; + + ioPending_ = true; + // Write the response asynchronously + stream_ptr_->async_write( + write_buffer->data(), + boost::asio::bind_executor( + strand_, + [this, write_buffer, self = shared_from_this()]( + error_code ec, std::size_t bytes_transferred) { + onHandshake(ec, bytes_transferred); + })); +} + +void +InboundHandshake::onHandshake(error_code ec, std::size_t bytes_transferred) +{ + ioPending_ = false; + if (!stream_ptr_->is_open()) + return; + + if (ec == boost::asio::error::operation_aborted || shutdown_) + return tryAsyncShutdown(); + + if (ec) + return fail("onHandshake", ec); + + JLOG(journal_.debug()) << "InboundHandshake completed for " + << remoteEndpoint_ + << ", bytes transferred: " << bytes_transferred; + + // Handshake successful, create the peer + createPeer(); +} + +void +InboundHandshake::createPeer() +{ + auto const peer = std::make_shared( + app_, + overlay_, + std::move(slot_), + std::move(stream_ptr_), + consumer_, + protocolVersion_, + attributes_, + publicKey_, + id_); + + // Add the peer to the overlay + overlay_.add_active(peer); + JLOG(journal_.debug()) << "Created peer for " << remoteEndpoint_; +} + +void +InboundHandshake::fail(std::string const& name, error_code ec) +{ + XRPL_ASSERT( + strand_.running_in_this_thread(), + "ripple::InboundHandshake::fail : strand in this thread"); + + JLOG(journal_.warn()) << name << " from " + << toBase58(TokenType::NodePublic, publicKey_) + << " at " << remoteEndpoint_.address().to_string() + << ": " << ec.message(); + + shutdown(); +} + +} // namespace ripple \ No newline at end of file diff --git a/src/xrpld/overlay/detail/InboundHandshake.h b/src/xrpld/overlay/detail/InboundHandshake.h new file mode 100644 index 0000000000..1753eef154 --- /dev/null +++ b/src/xrpld/overlay/detail/InboundHandshake.h @@ -0,0 +1,109 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_OVERLAY_INBOUNDHANDSHAKE_H_INCLUDED +#define RIPPLE_OVERLAY_INBOUNDHANDSHAKE_H_INCLUDED + +#include +#include +#include + +#include + +namespace ripple { + +/** Manages an inbound peer handshake. */ +class InboundHandshake : public OverlayImpl::Child, + public std::enable_shared_from_this +{ + using error_code = boost::system::error_code; + using endpoint_type = boost::asio::ip::tcp::endpoint; + +private: + Application& app_; + std::uint32_t const id_; + beast::WrappedSink sink_; + beast::Journal const journal_; + std::unique_ptr stream_ptr_; + http_request_type request_; + PublicKey publicKey_; + ProtocolVersion protocolVersion_; + Resource::Consumer consumer_; + PeerAttributes attributes_; + std::shared_ptr slot_; + endpoint_type remoteEndpoint_; + boost::asio::strand strand_; + bool shutdown_ = false; + bool ioPending_ = false; + bool shutdownStarted_ = false; + +public: + InboundHandshake( + Application& app, + std::uint32_t id, + std::shared_ptr const& slot, + http_request_type&& request, + PublicKey const& public_key, + ProtocolVersion protocol_version, + Resource::Consumer consumer, + std::unique_ptr&& stream_ptr, + PeerAttributes const& attributes, + endpoint_type const& remote_endpoint, + OverlayImpl& overlay); + + ~InboundHandshake(); + + void + stop() override; + + void + run(); + +private: + void + setTimer(); + + void + onTimer(error_code ec); + + void + cancelTimer(); + + void + shutdown(); + + void + tryAsyncShutdown(); + + void + onShutdown(error_code ec); + + void + onHandshake(error_code ec, std::size_t bytes_transferred); + + void + createPeer(); + + void + fail(std::string const& name, error_code ec); +}; + +} // namespace ripple + +#endif diff --git a/src/xrpld/overlay/detail/OverlayImpl.cpp b/src/xrpld/overlay/detail/OverlayImpl.cpp index 13a0c74b2c..a4562f640c 100644 --- a/src/xrpld/overlay/detail/OverlayImpl.cpp +++ b/src/xrpld/overlay/detail/OverlayImpl.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -173,9 +174,7 @@ OverlayImpl::onHandoff( beast::Journal journal(sink); Handoff handoff; - if (processRequest(request, handoff)) - return handoff; - if (!isPeerUpgrade(request)) + if (processRequest(request, handoff) || !isPeerUpgrade(request)) return handoff; handoff.moved = true; @@ -196,42 +195,11 @@ OverlayImpl::onHandoff( if (consumer.disconnect(journal)) return handoff; - auto const [slot, result] = m_peerFinder->new_inbound_slot( - beast::IPAddressConversion::from_asio(local_endpoint), - beast::IPAddressConversion::from_asio(remote_endpoint)); - - if (slot == nullptr) - { - // connection refused either IP limit exceeded or self-connect - handoff.moved = false; - JLOG(journal.debug()) - << "Peer " << remote_endpoint << " refused, " << to_string(result); - return handoff; - } - - // Validate HTTP request - - { - auto const types = beast::rfc2616::split_commas(request["Connect-As"]); - if (std::find_if(types.begin(), types.end(), [](std::string const& s) { - return boost::iequals(s, "peer"); - }) == types.end()) - { - handoff.moved = false; - handoff.response = - makeRedirectResponse(slot, request, remote_endpoint.address()); - handoff.keep_alive = beast::rfc2616::is_keep_alive(request); - return handoff; - } - } - auto const negotiatedVersion = negotiateProtocolVersion(request["Upgrade"]); if (!negotiatedVersion) { - m_peerFinder->on_closed(slot); handoff.moved = false; handoff.response = makeErrorResponse( - slot, request, remote_endpoint.address(), "Unable to agree on a protocol version"); @@ -244,17 +212,42 @@ OverlayImpl::onHandoff( auto const sharedValue = stream_ptr->makeSharedValue(journal); if (!sharedValue) { - m_peerFinder->on_closed(slot); handoff.moved = false; handoff.response = makeErrorResponse( - slot, - request, - remote_endpoint.address(), - "Incorrect security cookie"); + request, remote_endpoint.address(), "Incorrect security cookie"); handoff.keep_alive = false; return handoff; } + // Validate HTTP request + + { + auto const types = beast::rfc2616::split_commas(request["Connect-As"]); + if (std::find_if(types.begin(), types.end(), [](std::string const& s) { + return boost::iequals(s, "peer"); + }) == types.end()) + { + handoff.moved = false; + handoff.response = makeErrorResponse( + request, remote_endpoint.address(), "Invalid Peer Type"); + handoff.keep_alive = beast::rfc2616::is_keep_alive(request); + return handoff; + } + } + + auto const [slot, result] = m_peerFinder->new_inbound_slot( + beast::IPAddressConversion::from_asio(local_endpoint), + beast::IPAddressConversion::from_asio(remote_endpoint)); + + if (slot == nullptr) + { + // connection refused either IP limit exceeded or self-connect + handoff.moved = false; + JLOG(journal.debug()) + << "Peer " << remote_endpoint << " refused, " << to_string(result); + return handoff; + } + try { auto publicKey = verifyHandshake( @@ -269,10 +262,12 @@ OverlayImpl::onHandoff( // The node gets a reserved slot if it is in our cluster // or if it has a reservation. bool const reserved = - static_cast(app_.cluster().member(publicKey)) || + app_.cluster().member(publicKey).has_value() || app_.peerReservations().contains(publicKey); + auto const result = m_peerFinder->activate(slot, publicKey, reserved); + if (result != PeerFinder::Result::success) { m_peerFinder->on_closed(slot); @@ -290,7 +285,7 @@ OverlayImpl::onHandoff( auto const attributes = extractPeerAttributes(request, app_.config(), true); - auto const peer = std::make_shared( + auto const p = std::make_shared( app_, id, slot, @@ -300,23 +295,13 @@ OverlayImpl::onHandoff( consumer, std::move(stream_ptr), attributes, + remote_endpoint, *this); - { - // As we are not on the strand, run() must be called - // while holding the lock, otherwise new I/O can be - // queued after a call to stop(). - std::lock_guard lock(mutex_); - { - auto const result = m_peers.emplace(peer->slot(), peer); - XRPL_ASSERT( - result.second, - "ripple::OverlayImpl::onHandoff : peer is inserted"); - (void)result.second; - } - list_.emplace(peer.get(), peer); - peer->run(); - } + std::lock_guard lock(mutex_); + list_.emplace(p.get(), p); + p->run(); + handoff.moved = true; return handoff; } @@ -327,8 +312,8 @@ OverlayImpl::onHandoff( m_peerFinder->on_closed(slot); handoff.moved = false; - handoff.response = makeErrorResponse( - slot, request, remote_endpoint.address(), e.what()); + handoff.response = + makeErrorResponse(request, remote_endpoint.address(), e.what()); handoff.keep_alive = false; return handoff; } @@ -382,7 +367,6 @@ OverlayImpl::makeRedirectResponse( std::shared_ptr OverlayImpl::makeErrorResponse( - std::shared_ptr const& slot, http_request_type const& request, address_type remote_address, std::string text) diff --git a/src/xrpld/overlay/detail/OverlayImpl.h b/src/xrpld/overlay/detail/OverlayImpl.h index b4ea3307ec..6368daafe5 100644 --- a/src/xrpld/overlay/detail/OverlayImpl.h +++ b/src/xrpld/overlay/detail/OverlayImpl.h @@ -465,7 +465,6 @@ private: std::shared_ptr makeErrorResponse( - std::shared_ptr const& slot, http_request_type const& request, address_type remote_address, std::string msg); diff --git a/src/xrpld/overlay/detail/PeerImp.cpp b/src/xrpld/overlay/detail/PeerImp.cpp index 0479d8209d..48641fa34b 100644 --- a/src/xrpld/overlay/detail/PeerImp.cpp +++ b/src/xrpld/overlay/detail/PeerImp.cpp @@ -150,15 +150,14 @@ extractPeerAttributes( PeerImp::PeerImp( Application& app, - id_t id, + OverlayImpl& overlay, std::shared_ptr const& slot, - http_request_type&& request, - PublicKey const& publicKey, - ProtocolVersion protocol, - Resource::Consumer consumer, std::unique_ptr&& stream_ptr, + Resource::Consumer consumer, + ProtocolVersion protocol, PeerAttributes const& attributes, - OverlayImpl& overlay) + PublicKey const& publicKey, + id_t id) : Child(overlay) , app_(app) , id_(id) @@ -182,7 +181,6 @@ PeerImp::PeerImp( , usage_(consumer) , fee_{Resource::feeTrivialPeer, ""} , slot_(slot) - , request_(std::move(request)) , attributes_(attributes) , ledgerReplayMsgHandler_(app, app.getLedgerReplayer()) { @@ -831,15 +829,6 @@ PeerImp::doAccept() read_buffer_.size() == 0, "ripple::PeerImp::doAccept : empty read buffer"); - JLOG(journal_.debug()) << "doAccept: " << remote_address_; - - auto const sharedValue = stream_ptr_->makeSharedValue(journal_); - - // This shouldn't fail since we already computed - // the shared value successfully in OverlayImpl - if (!sharedValue) - return fail("makeSharedValue: Unexpected failure"); - JLOG(journal_.info()) << "Protocol: " << to_string(protocol_); JLOG(journal_.info()) << "Public Key: " << toBase58(TokenType::NodePublic, publicKey_); @@ -853,43 +842,7 @@ PeerImp::doAccept() JLOG(journal_.info()) << "Cluster name: " << *member; } - overlay_.activate(shared_from_this()); - - // XXX Set timer: connection is in grace period to be useful. - // XXX Set timer: connection idle (idle may vary depending on connection - // type.) - - auto write_buffer = std::make_shared(); - - boost::beast::ostream(*write_buffer) << makeResponse( - !overlay_.peerFinder().config().peerPrivate, - request_, - overlay_.setup().public_ip, - remote_address_.address(), - *sharedValue, - overlay_.setup().networkID, - protocol_, - app_); - - // Write the whole buffer and only start protocol when that's done. - stream_ptr_->async_write( - write_buffer->data(), - [this, write_buffer, self = shared_from_this()]( - error_code ec, std::size_t bytes_transferred) { - // Post completion to the strand to ensure thread safety - boost::asio::post( - strand_, [this, write_buffer, self, ec, bytes_transferred]() { - if (!socketOpen()) - return; - if (ec == boost::asio::error::operation_aborted) - return; - if (ec) - return fail("onWriteResponse", ec); - if (write_buffer->size() == bytes_transferred) - return doProtocolStart(); - return fail("Failed to write header"); - }); - }); + doProtocolStart(); } std::string diff --git a/src/xrpld/overlay/detail/PeerImp.h b/src/xrpld/overlay/detail/PeerImp.h index ef6f448374..ecca36c244 100644 --- a/src/xrpld/overlay/detail/PeerImp.h +++ b/src/xrpld/overlay/detail/PeerImp.h @@ -100,53 +100,6 @@ private: boost::asio::basic_waitable_timer; using Compressed = compression::Compressed; - Application& app_; - id_t const id_; - beast::WrappedSink sink_; - beast::WrappedSink p_sink_; - beast::Journal const journal_; - beast::Journal const p_journal_; - std::unique_ptr stream_ptr_; - boost::asio::strand strand_; - waitable_timer timer_; - - // Updated at each stage of the connection process to reflect - // the current conditions as closely as possible. - beast::IP::Endpoint const remote_address_; - - // These are up here to prevent warnings about order of initializations - // - OverlayImpl& overlay_; - bool const inbound_; - - // Protocol version to use for this link - ProtocolVersion protocol_; - - std::atomic tracking_; - clock_type::time_point trackingTime_; - bool detaching_ = false; - // Node public key of peer. - PublicKey const publicKey_; - std::string name_; - std::shared_mutex mutable nameMutex_; - - // The indices of the smallest and largest ledgers this peer has available - // - LedgerIndex minLedger_ = 0; - LedgerIndex maxLedger_ = 0; - uint256 closedLedgerHash_; - uint256 previousLedgerHash_; - - boost::circular_buffer recentLedgers_{128}; - boost::circular_buffer recentTxSets_{128}; - - std::optional latency_; - std::optional lastPingSeq_; - clock_type::time_point lastPingTime_; - clock_type::time_point const creationTime_; - - reduce_relay::Squelch squelch_; - // Notes on thread locking: // // During an audit it was noted that some member variables that looked @@ -194,31 +147,6 @@ private: } }; - std::mutex mutable recentLock_; - protocol::TMStatusChange last_status_; - Resource::Consumer usage_; - ChargeWithContext fee_; - std::shared_ptr const slot_; - boost::beast::multi_buffer read_buffer_; - http_request_type request_; - PeerAttributes const attributes_; - std::queue> send_queue_; - bool gracefulClose_ = false; - int large_sendq_ = 0; - std::unique_ptr load_event_; - // The highest sequence of each PublisherList that has - // been sent to or received from this peer. - hash_map publisherListSequences_; - - // Queue of transactions' hashes that have not been - // relayed. The hashes are sent once a second to a peer - // and the peer requests missing transactions from the node. - hash_set txQueue_; - - LedgerReplayMsgHandler ledgerReplayMsgHandler_; - - friend class OverlayImpl; - class Metrics { public: @@ -252,6 +180,79 @@ private: Metrics recv; } metrics_; + Application& app_; + id_t const id_; + + beast::WrappedSink sink_; + beast::WrappedSink p_sink_; + beast::Journal const journal_; + beast::Journal const p_journal_; + + std::unique_ptr stream_ptr_; + boost::asio::strand strand_; + waitable_timer timer_; + + // Updated at each stage of the connection process to reflect + // the current conditions as closely as possible. + beast::IP::Endpoint const remote_address_; + + // These are up here to prevent warnings about order of initializations + // + OverlayImpl& overlay_; + bool const inbound_; + + // Protocol version to use for this link + ProtocolVersion protocol_; + + std::atomic tracking_; + clock_type::time_point trackingTime_; + bool detaching_ = false; + // Node public key of peer. + PublicKey const publicKey_; + std::string name_; + std::shared_mutex mutable nameMutex_; + + // The indices of the smallest and largest ledgers this peer has available + // + LedgerIndex minLedger_ = 0; + LedgerIndex maxLedger_ = 0; + uint256 closedLedgerHash_; + uint256 previousLedgerHash_; + + boost::circular_buffer recentLedgers_{128}; + boost::circular_buffer recentTxSets_{128}; + + std::optional latency_; + std::optional lastPingSeq_; + clock_type::time_point lastPingTime_; + clock_type::time_point const creationTime_; + + reduce_relay::Squelch squelch_; + + std::mutex mutable recentLock_; + protocol::TMStatusChange last_status_; + Resource::Consumer usage_; + ChargeWithContext fee_; + std::shared_ptr const slot_; + boost::beast::multi_buffer read_buffer_; + PeerAttributes const attributes_; + std::queue> send_queue_; + bool gracefulClose_ = false; + int large_sendq_ = 0; + std::unique_ptr load_event_; + // The highest sequence of each PublisherList that has + // been sent to or received from this peer. + hash_map publisherListSequences_; + + // Queue of transactions' hashes that have not been + // relayed. The hashes are sent once a second to a peer + // and the peer requests missing transactions from the node. + hash_set txQueue_; + + LedgerReplayMsgHandler ledgerReplayMsgHandler_; + + friend class OverlayImpl; + public: PeerImp(PeerImp const&) = delete; PeerImp& @@ -260,25 +261,23 @@ public: /** Create an active incoming peer from an established ssl connection. */ PeerImp( Application& app, - id_t id, + OverlayImpl& overlay, std::shared_ptr const& slot, - http_request_type&& request, - PublicKey const& publicKey, - ProtocolVersion protocol, - Resource::Consumer consumer, std::unique_ptr&& stream_ptr, + Resource::Consumer consumer, + ProtocolVersion protocol, PeerAttributes const& attributes, - OverlayImpl& overlay); + PublicKey const& publicKey, + id_t id); /** Create outgoing, handshaked peer. */ - // VFALCO legacyPublicKey should be implied by the Slot template PeerImp( Application& app, std::unique_ptr&& stream_ptr, Buffers const& buffers, std::shared_ptr&& slot, - Resource::Consumer usage, + Resource::Consumer consumer, PublicKey const& publicKey, ProtocolVersion protocol, id_t id,