Improve protocol-level handshaking protocol:

This commit restructures the HTTP based protocol negotiation that `rippled`
executes and introduces support for negotiation of compression for peer
links which, if implemented, should result in significant bandwidth savings
for some server roles.

This commit also introduces the new `[network_id]` configuration option
that administrators can use to specify which network the server is part of
and intends to join. This makes it possible for servers from different
networks to drop the link early.

The changeset also improves the log messages generated when negotiation
of a peer link upgrade fails. In the past, no useful information would
be logged, making it more difficult for admins to troubleshoot errors.

This commit also fixes RIPD-237 and RIPD-451
This commit is contained in:
Nik Bougalis
2019-06-22 17:02:13 -07:00
parent 3ea525430e
commit f6916bfd42
41 changed files with 1496 additions and 1581 deletions

View File

@@ -20,6 +20,7 @@
#ifndef RIPPLE_OVERLAY_PROTOCOLMESSAGE_H_INCLUDED
#define RIPPLE_OVERLAY_PROTOCOLMESSAGE_H_INCLUDED
#include <ripple/basics/ByteUtilities.h>
#include <ripple/protocol/messages.h>
#include <ripple/overlay/Message.h>
#include <ripple/overlay/impl/ZeroCopyStream.h>
@@ -41,17 +42,13 @@ protocolMessageName (int type)
{
switch (type)
{
case protocol::mtHELLO: return "hello";
case protocol::mtMANIFESTS: return "manifests";
case protocol::mtPING: return "ping";
case protocol::mtPROOFOFWORK: return "proof_of_work";
case protocol::mtCLUSTER: return "cluster";
case protocol::mtGET_SHARD_INFO: return "get_shard_info";
case protocol::mtSHARD_INFO: return "shard_info";
case protocol::mtGET_PEER_SHARD_INFO: return "get_peer_shard_info";
case protocol::mtPEER_SHARD_INFO: return "peer_shard_info";
case protocol::mtGET_PEERS: return "get_peers";
case protocol::mtPEERS: return "peers";
case protocol::mtENDPOINTS: return "endpoints";
case protocol::mtTRANSACTION: return "tx";
case protocol::mtGET_LEDGER: return "get_ledger";
@@ -63,33 +60,83 @@ protocolMessageName (int type)
case protocol::mtGET_OBJECTS: return "get_objects";
default:
break;
};
}
return "unknown";
}
namespace detail {
template <class T, class Buffers, class Handler>
std::enable_if_t<std::is_base_of<
::google::protobuf::Message, T>::value,
boost::system::error_code>
invoke (int type, Buffers const& buffers,
struct MessageHeader
{
/** The size of the message on the wire.
@note This is the sum of sizes of the header and the payload.
*/
std::uint32_t total_wire_size = 0;
/** The size of the header associated with this message. */
std::uint32_t header_size = 0;
/** The size of the payload on the wire. */
std::uint32_t payload_wire_size = 0;
/** The type of the message. */
std::uint16_t message_type = 0;
};
template <class BufferSequence>
boost::optional<MessageHeader> parseMessageHeader(
BufferSequence const& bufs,
std::size_t size)
{
auto iter = boost::asio::buffers_iterator<BufferSequence, std::uint8_t>::begin(bufs);
MessageHeader hdr;
// Version 1 header: uncompressed payload.
// The top six bits of the first byte are 0.
if ((*iter & 0xFC) == 0)
{
hdr.header_size = 6;
if (size < hdr.header_size)
return {};
for (int i = 0; i != 4; ++i)
hdr.payload_wire_size = (hdr.payload_wire_size << 8) + *iter++;
hdr.total_wire_size = hdr.header_size + hdr.payload_wire_size;
for (int i = 0; i != 2; ++i)
hdr.message_type = (hdr.message_type << 8) + *iter++;
return hdr;
}
return {};
}
template <class T, class Buffers, class Handler,
class = std::enable_if_t<std::is_base_of<::google::protobuf::Message, T>::value>>
bool
invoke (
MessageHeader const& header,
Buffers const& buffers,
Handler& handler)
{
auto const m = std::make_shared<T>();
ZeroCopyInputStream<Buffers> stream(buffers);
stream.Skip(Message::kHeaderBytes);
auto const m (std::make_shared<T>());
stream.Skip(header.header_size);
if (! m->ParseFromZeroCopyStream(&stream))
return boost::system::errc::make_error_code(
boost::system::errc::invalid_argument);
auto ec = handler.onMessageBegin (type, m,
Message::kHeaderBytes + Message::size (buffers));
if (! ec)
{
handler.onMessage (m);
handler.onMessageEnd (type, m);
}
return ec;
return false;
handler.onMessageBegin (header.message_type, m, header.payload_wire_size);
handler.onMessage (m);
handler.onMessageEnd (header.message_type, m);
return true;
}
}
@@ -106,82 +153,100 @@ std::pair <std::size_t, boost::system::error_code>
invokeProtocolMessage (Buffers const& buffers, Handler& handler)
{
std::pair<std::size_t,boost::system::error_code> result = { 0, {} };
boost::system::error_code& ec = result.second;
auto const bs = boost::asio::buffer_size(buffers);
auto const size = boost::asio::buffer_size(buffers);
// If we don't even have enough bytes for the header, there's no point
// in doing any work.
if (bs < Message::kHeaderBytes)
if (size == 0)
return result;
if (bs > Message::kMaxMessageSize)
auto header = detail::parseMessageHeader(buffers, size);
// If we can't parse the header then it may be that we don't have enough
// bytes yet, or because the message was cut off.
if (!header)
return result;
// We implement a maximum size for protocol messages. Sending a message
// whose size exceeds this may result in the connection being dropped. A
// larger message size may be supported in the future or negotiated as
// part of a protocol upgrade.
if (header->payload_wire_size > megabytes(64))
{
result.second = make_error_code(boost::system::errc::message_size);
return result;
}
auto const size = Message::kHeaderBytes + Message::size(buffers);
if (bs < size)
// We don't have the whole message yet. This isn't an error but we have
// nothing to do.
if (header->total_wire_size > size)
return result;
auto const type = Message::type(buffers);
bool success;
switch (type)
switch (header->message_type)
{
case protocol::mtHELLO: ec = detail::invoke<protocol::TMHello> (type, buffers, handler); break;
case protocol::mtMANIFESTS: ec = detail::invoke<protocol::TMManifests> (type, buffers, handler); break;
case protocol::mtPING: ec = detail::invoke<protocol::TMPing> (type, buffers, handler); break;
case protocol::mtCLUSTER: ec = detail::invoke<protocol::TMCluster> (type, buffers, handler); break;
case protocol::mtGET_SHARD_INFO: ec = detail::invoke<protocol::TMGetShardInfo> (type, buffers, handler); break;
case protocol::mtSHARD_INFO: ec = detail::invoke<protocol::TMShardInfo>(type, buffers, handler); break;
case protocol::mtGET_PEER_SHARD_INFO: ec = detail::invoke<protocol::TMGetPeerShardInfo> (type, buffers, handler); break;
case protocol::mtPEER_SHARD_INFO: ec = detail::invoke<protocol::TMPeerShardInfo>(type, buffers, handler); break;
case protocol::mtGET_PEERS: ec = detail::invoke<protocol::TMGetPeers> (type, buffers, handler); break;
case protocol::mtPEERS: ec = detail::invoke<protocol::TMPeers> (type, buffers, handler); break;
case protocol::mtENDPOINTS: ec = detail::invoke<protocol::TMEndpoints> (type, buffers, handler); break;
case protocol::mtTRANSACTION: ec = detail::invoke<protocol::TMTransaction> (type, buffers, handler); break;
case protocol::mtGET_LEDGER: ec = detail::invoke<protocol::TMGetLedger> (type, buffers, handler); break;
case protocol::mtLEDGER_DATA: ec = detail::invoke<protocol::TMLedgerData> (type, buffers, handler); break;
case protocol::mtPROPOSE_LEDGER: ec = detail::invoke<protocol::TMProposeSet> (type, buffers, handler); break;
case protocol::mtSTATUS_CHANGE: ec = detail::invoke<protocol::TMStatusChange> (type, buffers, handler); break;
case protocol::mtHAVE_SET: ec = detail::invoke<protocol::TMHaveTransactionSet> (type, buffers, handler); break;
case protocol::mtVALIDATION: ec = detail::invoke<protocol::TMValidation> (type, buffers, handler); break;
case protocol::mtGET_OBJECTS: ec = detail::invoke<protocol::TMGetObjectByHash> (type, buffers, handler); break;
case protocol::mtMANIFESTS:
success = detail::invoke<protocol::TMManifests>(*header, buffers, handler);
break;
case protocol::mtPING:
success = detail::invoke<protocol::TMPing>(*header, buffers, handler);
break;
case protocol::mtCLUSTER:
success = detail::invoke<protocol::TMCluster>(*header, buffers, handler);
break;
case protocol::mtGET_SHARD_INFO:
success = detail::invoke<protocol::TMGetShardInfo>(*header, buffers, handler);
break;
case protocol::mtSHARD_INFO:
success = detail::invoke<protocol::TMShardInfo>(*header, buffers, handler);
break;
case protocol::mtGET_PEER_SHARD_INFO:
success = detail::invoke<protocol::TMGetPeerShardInfo>(*header, buffers, handler);
break;
case protocol::mtPEER_SHARD_INFO:
success = detail::invoke<protocol::TMPeerShardInfo>(*header, buffers, handler);
break;
case protocol::mtENDPOINTS:
success = detail::invoke<protocol::TMEndpoints>(*header, buffers, handler);
break;
case protocol::mtTRANSACTION:
success = detail::invoke<protocol::TMTransaction>(*header, buffers, handler);
break;
case protocol::mtGET_LEDGER:
success = detail::invoke<protocol::TMGetLedger>(*header, buffers, handler);
break;
case protocol::mtLEDGER_DATA:
success = detail::invoke<protocol::TMLedgerData>(*header, buffers, handler);
break;
case protocol::mtPROPOSE_LEDGER:
success = detail::invoke<protocol::TMProposeSet>(*header, buffers, handler);
break;
case protocol::mtSTATUS_CHANGE:
success = detail::invoke<protocol::TMStatusChange>(*header, buffers, handler);
break;
case protocol::mtHAVE_SET:
success = detail::invoke<protocol::TMHaveTransactionSet>(*header, buffers, handler);
break;
case protocol::mtVALIDATION:
success = detail::invoke<protocol::TMValidation>(*header, buffers, handler);
break;
case protocol::mtGET_OBJECTS:
success = detail::invoke<protocol::TMGetObjectByHash>(*header, buffers, handler);
break;
default:
ec = handler.onMessageUnknown (type);
handler.onMessageUnknown (header->message_type);
success = true;
break;
}
if (! ec)
result.first = size;
result.first = header->total_wire_size;
if (!success)
result.second = make_error_code(boost::system::errc::bad_message);
return result;
}
/** Write a protocol message to a streambuf. */
template <class Streambuf>
void
write (Streambuf& streambuf,
::google::protobuf::Message const& m, int type,
std::size_t blockBytes)
{
auto const size = m.ByteSize();
std::array<std::uint8_t, 6> v;
v[0] = static_cast<std::uint8_t>((size >> 24) & 0xFF);
v[1] = static_cast<std::uint8_t>((size >> 16) & 0xFF);
v[2] = static_cast<std::uint8_t>((size >> 8) & 0xFF);
v[3] = static_cast<std::uint8_t>( size & 0xFF);
v[4] = static_cast<std::uint8_t>((type >> 8) & 0xFF);
v[5] = static_cast<std::uint8_t>( type & 0xFF);
streambuf.commit(boost::asio::buffer_copy(
streambuf.prepare(Message::kHeaderBytes),
boost::asio::buffer(v)));
ZeroCopyOutputStream<Streambuf> stream (
streambuf, blockBytes);
m.SerializeToZeroCopyStream(&stream);
}
} // ripple
#endif