Add ProtocolStart and GracefulClose P2P protocol messages (#3839)

Clean up the peer-to-peer protocol start/close sequences by introducing
START_PROTOCOL and GRACEFUL_CLOSE messages, which sync inbound/outbound
peer send/receive. The GRACEFUL_CLOSE message differentiates application
and link layer failures.

* Introduce the `InboundHandoff` class to manage inbound peer
  instantiation and synchronize the send/receive protocol messages
  between peers.
* Update `OverlayImpl` to utilize the `InboundHandoff` class to manage
  inbound handshakes.
* Update `PeerImp` for improved handling of protocol messages.
* Modify the `Message` class for better maintainability.
* Introduce P2P protocol version `2.3`.
This commit is contained in:
Gregory Tsipenyuk
2023-09-22 18:56:44 -04:00
committed by GitHub
parent 5433e133d5
commit 8f89694fae
14 changed files with 565 additions and 140 deletions

View File

@@ -636,6 +636,7 @@ target_sources (rippled PRIVATE
src/ripple/overlay/impl/Cluster.cpp
src/ripple/overlay/impl/ConnectAttempt.cpp
src/ripple/overlay/impl/Handshake.cpp
src/ripple/overlay/impl/InboundHandoff.cpp
src/ripple/overlay/impl/Message.cpp
src/ripple/overlay/impl/OverlayImpl.cpp
src/ripple/overlay/impl/PeerImp.cpp

View File

@@ -100,6 +100,14 @@ public:
return validatorKey_;
}
/** Get the message type from the payload header.
* First four bytes are the compression/algorithm flag and the payload size.
* Next two bytes are the message type
* @return Message type
*/
int
getType() const;
private:
std::vector<uint8_t> buffer_;
std::vector<uint8_t> bufferCompressed_;
@@ -129,15 +137,6 @@ private:
*/
void
compress();
/** Get the message type from the payload header.
* First four bytes are the compression/algorithm flag and the payload size.
* Next two bytes are the message type
* @param in Payload header pointer
* @return Message type
*/
int
getType(std::uint8_t const* in) const;
};
} // namespace ripple

View File

@@ -39,6 +39,7 @@ enum class ProtocolFeature {
ValidatorListPropagation,
ValidatorList2Propagation,
LedgerReplay,
StartProtocol
};
/** Represents a peer connection in the overlay. */

View File

@@ -365,6 +365,50 @@ transferred between A and B and will not be able to intelligently tamper with th
message stream between Alice and Bob, although she may be still be able to inject
delays or terminate the link.
## Peer Connection Sequence
The _PeerImp_ object can be constructed as either an outbound or an inbound peer.
The outbound peer is constructed by the _ConnectAttempt_ - the client side of
the connection. The inbound peer is constructed by the _InboundHandoff_ -
the server side of the connection. This differentiation of the peers matters only
in terms of the object construction. Once constructed, both inbound and outbound
peer play the same role.
### Outbound Peer
An outbound connection is initiated once a second by
the _OverlayImpl::Timer::on_timer()_ method. This method calls
_OverlayImpl::autoConnect()_, which in turn calls _OverlayImpl::connect()_ for
every outbound endpoint generated by _PeerFinder::autoconnect()_. _connect()_
method constructs _ConnectAttempt_ object. _ConnectAttempt_ attempts to connect
to the provided endpoint and on a successful connection executes the client side
of the handshake protocol described above. If the handshake is successful then
the outbound _PeerImp_ object is constructed and passed to the overlay manager
_OverlayImpl_, which adds the object to the list of peers and children. The latter
maintains a list of objects which might be executing an asynchronous operation
and therefore have to be stopped on shutdown. The outbound _PeerImp_ sends
_TMStartProtocol_ message on start to instruct the connected inbound peer that
the outbound peer is ready to receive the protocol messages.
### Inbound Peer
Construction of the inbound peer is more involved. A multi protocol-server,
_ServerImpl_ located in _src/ripple/server_ module, maintains multiple configured
listening ports. Each listening port allows for multiple protocols including HTTP,
HTTP/S, WebSocket, Secure WebSocket, and the Peer protocol. For simplicity this
sequence describes only the Peer protocol. _ServerImpl_ constructs
_Door_ object for each configured protocol. Each instance of the _Door_ object
accepts connections on the configured port. On a successful connection the _Door_
constructs _SSLHTTPPeer_ object since the Peer protocol always uses SSL
connection. _SSLHTTPPeer_ executes the SSL handshake. If the handshake is successful
then a server handler, _ServerHandlerImpl_ located in _src/ripple/src/impl_, hands off
the connection to the _OverlayImpl::onHandoff()_ method. _onHandoff()_ method
validates the client's HTTP handshake request described above. If the request is
valid then the _InboundHandoff_ object is constructed. _InboundHandoff_ sends
HTTP response to the connected client, constructs the inbound _PeerImp_ object,
and passes it to the overlay manager _OverlayImpl_, which adds the object to
the list of peers and children. Once the inbound _PeerImp_ receives
_TMStartProtocol_ message, it starts sending the protocol messages.
# Ripple Clustering #

View File

@@ -0,0 +1,185 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012-2021 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/overlay/Cluster.h>
#include <ripple/overlay/impl/InboundHandoff.h>
#include <ripple/overlay/impl/PeerImp.h>
#include <boost/beast/core/ostream.hpp>
namespace ripple {
InboundHandoff::InboundHandoff(
Application& app,
id_t id,
std::shared_ptr<PeerFinder::Slot> const& slot,
http_request_type&& request,
PublicKey const& publicKey,
ProtocolVersion protocol,
Resource::Consumer consumer,
std::unique_ptr<stream_type>&& stream_ptr,
OverlayImpl& overlay)
: OverlayImpl::Child(overlay)
, app_(app)
, id_(id)
, sink_(
app_.journal("Peer"),
[id]() {
std::stringstream ss;
ss << "[" << std::setfill('0') << std::setw(3) << id << "] ";
return ss.str();
}())
, journal_(sink_)
, stream_ptr_(std::move(stream_ptr))
, strand_(stream_ptr_->next_layer().socket().get_executor())
, remote_address_(slot->remote_endpoint())
, protocol_(protocol)
, publicKey_(publicKey)
, usage_(consumer)
, slot_(slot)
, request_(std::move(request))
{
}
void
InboundHandoff::run()
{
if (!strand_.running_in_this_thread())
return post(
strand_, std::bind(&InboundHandoff::run, shared_from_this()));
sendResponse();
}
void
InboundHandoff::stop()
{
if (!strand_.running_in_this_thread())
return post(
strand_, std::bind(&InboundHandoff::stop, shared_from_this()));
if (stream_ptr_->next_layer().socket().is_open())
{
JLOG(journal_.debug()) << "Stop";
}
close();
}
void
InboundHandoff::sendResponse()
{
auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
// This shouldn't fail since we already computed
// the shared value successfully in OverlayImpl
if (!sharedValue)
return fail("makeSharedValue: Unexpected failure");
JLOG(journal_.info()) << "Protocol: " << to_string(protocol_);
JLOG(journal_.info()) << "Public Key: "
<< toBase58(TokenType::NodePublic, publicKey_);
auto write_buffer = std::make_shared<boost::beast::multi_buffer>();
boost::beast::ostream(*write_buffer) << makeResponse(
!overlay_.peerFinder().config().peerPrivate,
request_,
overlay_.setup().public_ip,
remote_address_.address(),
*sharedValue,
overlay_.setup().networkID,
protocol_,
app_);
// Write the whole buffer and only start protocol when that's done.
boost::asio::async_write(
*stream_ptr_,
write_buffer->data(),
boost::asio::transfer_all(),
bind_executor(
strand_,
[this, write_buffer, self = shared_from_this()](
error_code ec, std::size_t bytes_transferred) {
if (!stream_ptr_->next_layer().socket().is_open())
return;
if (ec == boost::asio::error::operation_aborted)
return;
if (ec)
return fail("onWriteResponse", ec);
if (write_buffer->size() == bytes_transferred)
return createPeer();
return fail("Failed to write header");
}));
}
void
InboundHandoff::fail(std::string const& name, error_code const& ec)
{
if (socket().is_open())
{
JLOG(journal_.warn())
<< name << " from " << toBase58(TokenType::NodePublic, publicKey_)
<< " at " << remote_address_.to_string() << ": " << ec.message();
}
close();
}
void
InboundHandoff::fail(std::string const& reason)
{
if (journal_.active(beast::severities::kWarning) && socket().is_open())
{
auto const n = app_.cluster().member(publicKey_);
JLOG(journal_.warn())
<< (n ? remote_address_.to_string() : *n) << " failed: " << reason;
}
close();
}
void
InboundHandoff::close()
{
if (socket().is_open())
{
socket().close();
JLOG(journal_.debug()) << "Closed";
}
}
void
InboundHandoff::createPeer()
{
auto peer = std::make_shared<PeerImp>(
app_,
id_,
slot_,
std::move(request_),
publicKey_,
protocol_,
usage_,
std::move(stream_ptr_),
overlay_);
overlay_.add_active(peer);
}
InboundHandoff::socket_type&
InboundHandoff::socket() const
{
return stream_ptr_->next_layer().socket();
}
} // namespace ripple

View File

@@ -0,0 +1,102 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012-2021 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_OVERLAY_INBOUNDHANDOFF_H_INCLUDED
#define RIPPLE_OVERLAY_INBOUNDHANDOFF_H_INCLUDED
#include <ripple/overlay/impl/OverlayImpl.h>
namespace ripple {
/** Sends HTTP response. Instantiates the inbound peer
* once the response is sent. Maintains all data members
* required for the inbound peer instantiation.
*/
class InboundHandoff : public OverlayImpl::Child,
public std::enable_shared_from_this<InboundHandoff>
{
private:
using error_code = boost::system::error_code;
using socket_type = boost::asio::ip::tcp::socket;
using middle_type = boost::beast::tcp_stream;
using stream_type = boost::beast::ssl_stream<middle_type>;
using id_t = Peer::id_t;
Application& app_;
id_t const id_;
beast::WrappedSink sink_;
beast::Journal const journal_;
std::unique_ptr<stream_type> stream_ptr_;
boost::asio::strand<boost::asio::executor> strand_;
beast::IP::Endpoint const remote_address_;
ProtocolVersion protocol_;
PublicKey const publicKey_;
Resource::Consumer usage_;
std::shared_ptr<PeerFinder::Slot> const slot_;
http_request_type request_;
public:
virtual ~InboundHandoff() override = default;
InboundHandoff(
Application& app,
id_t id,
std::shared_ptr<PeerFinder::Slot> const& slot,
http_request_type&& request,
PublicKey const& publicKey,
ProtocolVersion protocol,
Resource::Consumer consumer,
std::unique_ptr<stream_type>&& stream_ptr,
OverlayImpl& overlay);
// This class isn't meant to be copied
InboundHandoff(InboundHandoff const&) = delete;
InboundHandoff&
operator=(InboundHandoff const&) = delete;
/** Start the handshake */
void
run();
/** Stop the child */
void
stop() override;
private:
/** Send upgrade response to the client */
void
sendResponse();
/** Instantiate and run the overlay peer */
void
createPeer();
/** Log and close */
void
fail(std::string const& name, error_code const& ec);
/** Log and close */
void
fail(std::string const& reason);
/** Close connection */
void
close();
/** Get underlying socket */
socket_type&
socket() const;
};
} // namespace ripple
#endif // RIPPLE_OVERLAY_INBOUNDHANDOFF_H_INCLUDED

View File

@@ -70,7 +70,7 @@ Message::compress()
using namespace ripple::compression;
auto const messageBytes = buffer_.size() - headerBytes;
auto type = getType(buffer_.data());
auto type = getType();
bool const compressible = [&] {
if (messageBytes <= 70)
@@ -221,9 +221,10 @@ Message::getBuffer(Compressed tryCompressed)
}
int
Message::getType(std::uint8_t const* in) const
Message::getType() const
{
int type = (static_cast<int>(*(in + 4)) << 8) + *(in + 5);
int type =
(static_cast<int>(*(buffer_.data() + 4)) << 8) + *(buffer_.data() + 5);
return type;
}

View File

@@ -31,6 +31,7 @@
#include <ripple/nodestore/DatabaseShard.h>
#include <ripple/overlay/Cluster.h>
#include <ripple/overlay/impl/ConnectAttempt.h>
#include <ripple/overlay/impl/InboundHandoff.h>
#include <ripple/overlay/impl/PeerImp.h>
#include <ripple/overlay/predicates.h>
#include <ripple/peerfinder/make_Manager.h>
@@ -279,7 +280,7 @@ OverlayImpl::onHandoff(
}
}
auto const peer = std::make_shared<PeerImp>(
auto const ih = std::make_shared<InboundHandoff>(
app_,
id,
slot,
@@ -290,18 +291,10 @@ OverlayImpl::onHandoff(
std::move(stream_ptr),
*this);
{
// As we are not on the strand, run() must be called
// while holding the lock, otherwise new I/O can be
// queued after a call to stop().
std::lock_guard<decltype(mutex_)> lock(mutex_);
{
auto const result = m_peers.emplace(peer->slot(), peer);
assert(result.second);
(void)result.second;
}
list_.emplace(peer.get(), peer);
list_.emplace(ih.get(), ih);
peer->run();
ih->run();
}
handoff.moved = true;
return handoff;

View File

@@ -66,6 +66,30 @@ std::chrono::milliseconds constexpr peerHighLatency{300};
std::chrono::seconds constexpr peerTimerInterval{60};
} // namespace
std::string
closeReasonToString(protocol::TMCloseReason reason)
{
switch (reason)
{
case protocol::TMCloseReason::crCHARGE_RESOURCES:
return "Charge: Resources";
case protocol::TMCloseReason::crMALFORMED_HANDSHAKE1:
return "Malformed handshake data (1)";
case protocol::TMCloseReason::crMALFORMED_HANDSHAKE2:
return "Malformed handshake data (2)";
case protocol::TMCloseReason::crMALFORMED_HANDSHAKE3:
return "Malformed handshake data (3)";
case protocol::TMCloseReason::crLARGE_SENDQUEUE:
return "Large send queue";
case protocol::TMCloseReason::crNOT_USEFUL:
return "Not useful";
case protocol::TMCloseReason::crPING_TIMEOUT:
return "Ping timeout";
default:
return "Unknown reason";
}
}
PeerImp::PeerImp(
Application& app,
id_t id,
@@ -132,6 +156,11 @@ PeerImp::PeerImp(
<< " tx reduce-relay enabled "
<< txReduceRelayEnabled_ << " on " << remote_address_
<< " " << id_;
if (auto member = app_.cluster().member(publicKey_))
{
name_ = *member;
JLOG(journal_.info()) << "Cluster name: " << *member;
}
}
PeerImp::~PeerImp()
@@ -182,7 +211,7 @@ PeerImp::run()
closed = parseLedgerHash(iter->value());
if (!closed)
fail("Malformed handshake data (1)");
fail(protocol::TMCloseReason::crMALFORMED_HANDSHAKE1);
}
if (auto const iter = headers_.find("Previous-Ledger");
@@ -191,11 +220,11 @@ PeerImp::run()
previous = parseLedgerHash(iter->value());
if (!previous)
fail("Malformed handshake data (2)");
fail(protocol::TMCloseReason::crMALFORMED_HANDSHAKE2);
}
if (previous && !closed)
fail("Malformed handshake data (3)");
fail(protocol::TMCloseReason::crMALFORMED_HANDSHAKE3);
{
std::lock_guard<std::mutex> sl(recentLock_);
@@ -205,9 +234,6 @@ PeerImp::run()
previousLedgerHash_ = *previous;
}
if (inbound_)
doAccept();
else
doProtocolStart();
// Anything else that needs to be done with the connection should be
@@ -350,7 +376,7 @@ PeerImp::charge(Resource::Charge const& fee)
{
// Sever the connection
overlay_.incPeerDisconnectCharges();
fail("charge: Resources");
fail(protocol::TMCloseReason::crCHARGE_RESOURCES);
}
}
@@ -508,6 +534,8 @@ PeerImp::supportsFeature(ProtocolFeature f) const
return protocol_ >= make_protocol(2, 2);
case ProtocolFeature::LedgerReplay:
return ledgerReplayEnabled_;
case ProtocolFeature::StartProtocol:
return protocol_ >= make_protocol(2, 3);
}
return false;
}
@@ -600,22 +628,34 @@ PeerImp::close()
}
void
PeerImp::fail(std::string const& reason)
PeerImp::fail(protocol::TMCloseReason reason)
{
if (!strand_.running_in_this_thread())
return post(
strand_,
std::bind(
(void (Peer::*)(std::string const&)) & PeerImp::fail,
(void (Peer::*)(protocol::TMCloseReason)) & PeerImp::fail,
shared_from_this(),
reason));
if (journal_.active(beast::severities::kWarning) && socket_.is_open())
{
std::string const n = name();
JLOG(journal_.warn()) << (n.empty() ? remote_address_.to_string() : n)
<< " failed: " << reason;
<< " failed: " << closeReasonToString(reason);
}
close();
// erase all outstanding messages except for the one
// currently being executed
if (send_queue_.size() > 1)
{
decltype(send_queue_) q({send_queue_.front()});
send_queue_.swap(q);
}
closeOnWriteComplete_ = true;
protocol::TMGracefulClose tmGC;
tmGC.set_reason(reason);
send(std::make_shared<Message>(tmGC, protocol::mtGRACEFUL_CLOSE));
}
void
@@ -707,7 +747,7 @@ PeerImp::onTimer(error_code const& ec)
if (large_sendq_++ >= Tuning::sendqIntervals)
{
fail("Large send queue");
fail(protocol::TMCloseReason::crLARGE_SENDQUEUE);
return;
}
@@ -726,7 +766,7 @@ PeerImp::onTimer(error_code const& ec)
(duration > app_.config().MAX_UNKNOWN_TIME)))
{
overlay_.peerFinder().on_failure(slot_);
fail("Not useful");
fail(protocol::TMCloseReason::crLARGE_SENDQUEUE);
return;
}
}
@@ -734,7 +774,7 @@ PeerImp::onTimer(error_code const& ec)
// Already waiting for PONG
if (lastPingSeq_)
{
fail("Ping Timeout");
fail(protocol::TMCloseReason::crPING_TIMEOUT);
return;
}
@@ -766,71 +806,6 @@ PeerImp::onShutdown(error_code ec)
}
//------------------------------------------------------------------------------
void
PeerImp::doAccept()
{
assert(read_buffer_.size() == 0);
JLOG(journal_.debug()) << "doAccept: " << remote_address_;
auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
// This shouldn't fail since we already computed
// the shared value successfully in OverlayImpl
if (!sharedValue)
return fail("makeSharedValue: Unexpected failure");
JLOG(journal_.info()) << "Protocol: " << to_string(protocol_);
JLOG(journal_.info()) << "Public Key: "
<< toBase58(TokenType::NodePublic, publicKey_);
if (auto member = app_.cluster().member(publicKey_))
{
{
std::unique_lock lock{nameMutex_};
name_ = *member;
}
JLOG(journal_.info()) << "Cluster name: " << *member;
}
overlay_.activate(shared_from_this());
// XXX Set timer: connection is in grace period to be useful.
// XXX Set timer: connection idle (idle may vary depending on connection
// type.)
auto write_buffer = std::make_shared<boost::beast::multi_buffer>();
boost::beast::ostream(*write_buffer) << makeResponse(
!overlay_.peerFinder().config().peerPrivate,
request_,
overlay_.setup().public_ip,
remote_address_.address(),
*sharedValue,
overlay_.setup().networkID,
protocol_,
app_);
// Write the whole buffer and only start protocol when that's done.
boost::asio::async_write(
stream_,
write_buffer->data(),
boost::asio::transfer_all(),
bind_executor(
strand_,
[this, write_buffer, self = shared_from_this()](
error_code ec, std::size_t bytes_transferred) {
if (!socket_.is_open())
return;
if (ec == boost::asio::error::operation_aborted)
return;
if (ec)
return fail("onWriteResponse", ec);
if (write_buffer->size() == bytes_transferred)
return doProtocolStart();
return fail("Failed to write header");
}));
}
std::string
PeerImp::name() const
@@ -854,30 +829,29 @@ PeerImp::doProtocolStart()
{
onReadMessage(error_code(), 0);
// Send all the validator lists that have been loaded
if (inbound_ && supportsFeature(ProtocolFeature::ValidatorListPropagation))
{
app_.validators().for_each_available(
[&](std::string const& manifest,
std::uint32_t version,
std::map<std::size_t, ValidatorBlobInfo> const& blobInfos,
PublicKey const& pubKey,
std::size_t maxSequence,
uint256 const& hash) {
ValidatorList::sendValidatorList(
*this,
0,
pubKey,
maxSequence,
version,
manifest,
blobInfos,
app_.getHashRouter(),
p_journal_);
bool supportedProtocol = supportsFeature(ProtocolFeature::StartProtocol);
// Don't send it next time.
app_.getHashRouter().addSuppressionPeer(hash, id_);
});
if (!inbound_)
{
// Instruct connected inbound peer to start sending
// protocol messages
if (supportedProtocol)
{
JLOG(journal_.debug())
<< "doProtocolStart(): outbound sending mtSTART_PROTOCOL to "
<< remote_address_;
protocol::TMStartProtocol tmPS;
tmPS.set_starttime(std::chrono::duration_cast<std::chrono::seconds>(
clock_type::now().time_since_epoch())
.count());
send(std::make_shared<Message>(tmPS, protocol::mtSTART_PROTOCOL));
}
else
{
JLOG(journal_.debug()) << "doProtocolStart(): outbound connected "
"to an older protocol on "
<< remote_address_ << " " << protocol_.first
<< " " << protocol_.second;
}
if (auto m = overlay_.getManifestsMessage())
@@ -886,7 +860,18 @@ PeerImp::doProtocolStart()
// Request shard info from peer
protocol::TMGetPeerShardInfoV2 tmGPS;
tmGPS.set_relays(0);
send(std::make_shared<Message>(tmGPS, protocol::mtGET_PEER_SHARD_INFO_V2));
send(std::make_shared<Message>(
tmGPS, protocol::mtGET_PEER_SHARD_INFO_V2));
}
// Backward compatibility with the older protocols
else if (!supportedProtocol)
{
JLOG(journal_.debug())
<< "doProtocolStart(): inbound handling of an older protocol on "
<< remote_address_ << " " << protocol_.first << " "
<< protocol_.second;
onStartProtocol();
}
setTimer();
}
@@ -954,7 +939,11 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
if (!socket_.is_open())
return;
if (ec == boost::asio::error::operation_aborted)
{
if (closeOnWriteComplete_)
close();
return;
}
if (ec)
return fail("onWriteMessage", ec);
if (auto stream = journal_.trace())
@@ -968,6 +957,11 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
metrics_.sent.add_message(bytes_transferred);
assert(!send_queue_.empty());
if (send_queue_.front()->getType() == protocol::mtGRACEFUL_CLOSE)
{
close();
return;
}
send_queue_.pop();
if (!send_queue_.empty())
{
@@ -2947,6 +2941,69 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
<< "onMessage: TMSquelch " << slice << " " << id() << " " << duration;
}
void
PeerImp::onStartProtocol()
{
JLOG(journal_.debug()) << "onStartProtocol(): " << remote_address_;
// Send all the validator lists that have been loaded
if (supportsFeature(ProtocolFeature::ValidatorListPropagation))
{
app_.validators().for_each_available(
[&](std::string const& manifest,
std::uint32_t version,
std::map<std::size_t, ValidatorBlobInfo> const& blobInfos,
PublicKey const& pubKey,
std::size_t maxSequence,
uint256 const& hash) {
ValidatorList::sendValidatorList(
*this,
0,
pubKey,
maxSequence,
version,
manifest,
blobInfos,
app_.getHashRouter(),
p_journal_);
// Don't send it next time.
app_.getHashRouter().addSuppressionPeer(hash, id_);
});
}
if (auto m = overlay_.getManifestsMessage())
send(m);
// Request shard info from peer
protocol::TMGetPeerShardInfoV2 tmGPS;
tmGPS.set_relays(0);
send(std::make_shared<Message>(tmGPS, protocol::mtGET_PEER_SHARD_INFO_V2));
}
void
PeerImp::onMessage(std::shared_ptr<protocol::TMStartProtocol> const& m)
{
JLOG(journal_.debug()) << "onMessage(TMStartProtocol): " << remote_address_;
onStartProtocol();
}
void
PeerImp::onMessage(const std::shared_ptr<protocol::TMGracefulClose>& m)
{
using on_message_fn =
void (PeerImp::*)(std::shared_ptr<protocol::TMGracefulClose> const&);
if (!strand_.running_in_this_thread())
return post(
strand_,
std::bind(
(on_message_fn)&PeerImp::onMessage, shared_from_this(), m));
JLOG(journal_.info()) << "got graceful close from: " << remote_address_
<< " reason: " << closeReasonToString(m->reason());
close();
}
//--------------------------------------------------------------------------
void

View File

@@ -180,6 +180,8 @@ private:
bool vpReduceRelayEnabled_ = false;
bool ledgerReplayEnabled_ = false;
LedgerReplayMsgHandler ledgerReplayMsgHandler_;
// close connection when async write is complete
bool closeOnWriteComplete_ = false;
friend class OverlayImpl;
@@ -235,7 +237,7 @@ public:
/** Create outgoing, handshaked peer. */
// VFALCO legacyPublicKey should be implied by the Slot
template <class Buffers>
template <typename Buffers>
PeerImp(
Application& app,
std::unique_ptr<stream_type>&& stream_ptr,
@@ -413,7 +415,7 @@ public:
isHighLatency() const override;
void
fail(std::string const& reason);
fail(protocol::TMCloseReason reason);
// Return any known shard info from this peer and its sub peers
[[nodiscard]] hash_map<PublicKey, NodeStore::ShardInfo> const
@@ -458,9 +460,6 @@ private:
void
onShutdown(error_code ec);
void
doAccept();
std::string
name() const;
@@ -584,6 +583,10 @@ public:
onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m);
void
onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m);
void
onMessage(std::shared_ptr<protocol::TMStartProtocol> const& m);
void
onMessage(std::shared_ptr<protocol::TMGracefulClose> const& m);
private:
//--------------------------------------------------------------------------
@@ -642,6 +645,9 @@ private:
void
processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m);
void
onStartProtocol();
};
//------------------------------------------------------------------------------

View File

@@ -112,6 +112,10 @@ protocolMessageName(int type)
return "get_peer_shard_info_v2";
case protocol::mtPEER_SHARD_INFO_V2:
return "peer_shard_info_v2";
case protocol::mtSTART_PROTOCOL:
return "start_protocol";
case protocol::mtGRACEFUL_CLOSE:
return "graceful_close";
default:
break;
}
@@ -492,6 +496,14 @@ invokeProtocolMessage(
success = detail::invoke<protocol::TMPeerShardInfoV2>(
*header, buffers, handler);
break;
case protocol::mtSTART_PROTOCOL:
success = detail::invoke<protocol::TMStartProtocol>(
*header, buffers, handler);
break;
case protocol::mtGRACEFUL_CLOSE:
success = detail::invoke<protocol::TMGracefulClose>(
*header, buffers, handler);
break;
default:
handler.onMessageUnknown(header->message_type);
success = true;

View File

@@ -37,7 +37,8 @@ namespace ripple {
constexpr ProtocolVersion const supportedProtocolList[]
{
{2, 1},
{2, 2}
{2, 2},
{2, 3}
};
// clang-format on

View File

@@ -33,6 +33,8 @@ enum MessageType
mtPEER_SHARD_INFO_V2 = 62;
mtHAVE_TRANSACTIONS = 63;
mtTRANSACTIONS = 64;
mtSTART_PROTOCOL = 65;
mtGRACEFUL_CLOSE = 66;
}
// token, iterations, target, challenge = issue demand for proof of work
@@ -452,3 +454,24 @@ message TMHaveTransactions
repeated bytes hashes = 1;
}
message TMStartProtocol
{
required uint64 startTime = 1;
}
enum TMCloseReason
{
crMALFORMED_HANDSHAKE1 = 1;
crMALFORMED_HANDSHAKE2 = 2;
crMALFORMED_HANDSHAKE3 = 3;
crCHARGE_RESOURCES = 4;
crLARGE_SENDQUEUE = 5;
crNOT_USEFUL = 6;
crPING_TIMEOUT = 7;
}
message TMGracefulClose
{
required TMCloseReason reason = 1;
}

View File

@@ -88,7 +88,7 @@ public:
BEAST_EXPECT(
negotiateProtocolVersion(
"RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/999.999") ==
make_protocol(2, 2));
make_protocol(2, 3));
BEAST_EXPECT(
negotiateProtocolVersion("XRPL/999.999, WebSocket/1.0") ==
std::nullopt);