Add protocol message compression support:

* Peers negotiate compression via HTTP Header "X-Offer-Compression: lz4"
* Messages greater than 70 bytes and protocol type messages MANIFESTS,
  ENDPOINTS, TRANSACTION, GET_LEDGER, LEDGER_DATA, GET_OBJECT,
  and VALIDATORLIST are compressed
* If the compressed message is larger than the uncompressed message
  then the uncompressed message is sent
* Compression flag and the compression algorithm type are included
  in the message header
* Only LZ4 block compression is currently supported
This commit is contained in:
Gregory Tsipenyuk
2020-02-15 10:50:52 -05:00
committed by manojsdoshi
parent ade5eb71cf
commit 758a3792eb
14 changed files with 875 additions and 37 deletions

View File

@@ -855,6 +855,7 @@ target_sources (rippled PRIVATE
src/test/overlay/ProtocolVersion_test.cpp src/test/overlay/ProtocolVersion_test.cpp
src/test/overlay/cluster_test.cpp src/test/overlay/cluster_test.cpp
src/test/overlay/short_read_test.cpp src/test/overlay/short_read_test.cpp
src/test/overlay/compression_test.cpp
#[===============================[ #[===============================[
test sources: test sources:
subdir: peerfinder subdir: peerfinder

View File

@@ -0,0 +1,153 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLED_COMPRESSIONALGORITHMS_H_INCLUDED
#define RIPPLED_COMPRESSIONALGORITHMS_H_INCLUDED
#include <ripple/basics/contract.h>
#include <lz4.h>
#include <algorithm>
namespace ripple {
namespace compression_algorithms {
/** Convenience wrapper for Throw
* @param message Message to log/throw
*/
inline void doThrow(const char *message)
{
Throw<std::runtime_error>(message);
}
/** LZ4 block compression.
* @tparam BufferFactory Callable object or lambda.
* Takes the requested buffer size and returns allocated buffer pointer.
* @param in Data to compress
* @param inSize Size of the data
* @param bf Compressed buffer allocator
* @return Size of compressed data, or zero if failed to compress
*/
template<typename BufferFactory>
std::size_t
lz4Compress(void const* in,
std::size_t inSize, BufferFactory&& bf)
{
if (inSize > UINT32_MAX)
doThrow("lz4 compress: invalid size");
auto const outCapacity = LZ4_compressBound(inSize);
// Request the caller to allocate and return the buffer to hold compressed data
auto compressed = bf(outCapacity);
auto compressedSize = LZ4_compress_default(
reinterpret_cast<const char*>(in),
reinterpret_cast<char*>(compressed),
inSize,
outCapacity);
if (compressedSize == 0)
doThrow("lz4 compress: failed");
return compressedSize;
}
/**
* @param in Compressed data
* @param inSize Size of compressed data
* @param decompressed Buffer to hold decompressed data
* @param decompressedSize Size of the decompressed buffer
* @return size of the decompressed data
*/
inline
std::size_t
lz4Decompress(std::uint8_t const* in, std::size_t inSize,
std::uint8_t* decompressed, std::size_t decompressedSize)
{
auto ret = LZ4_decompress_safe(reinterpret_cast<const char*>(in),
reinterpret_cast<char*>(decompressed), inSize, decompressedSize);
if (ret <= 0 || ret != decompressedSize)
doThrow("lz4 decompress: failed");
return decompressedSize;
}
/** LZ4 block decompression.
* @tparam InputStream ZeroCopyInputStream
* @param in Input source stream
* @param inSize Size of compressed data
* @param decompressed Buffer to hold decompressed data
* @param decompressedSize Size of the decompressed buffer
* @return size of the decompressed data
*/
template<typename InputStream>
std::size_t
lz4Decompress(InputStream& in, std::size_t inSize,
std::uint8_t* decompressed, std::size_t decompressedSize)
{
std::vector<std::uint8_t> compressed;
std::uint8_t const* chunk = nullptr;
int chunkSize = 0;
int copiedInSize = 0;
auto const currentBytes = in.ByteCount();
// Use the first chunk if it is >= inSize bytes of the compressed message.
// Otherwise copy inSize bytes of chunks into compressed buffer and
// use the buffer to decompress.
while (in.Next(reinterpret_cast<void const**>(&chunk), &chunkSize))
{
if (copiedInSize == 0)
{
if (chunkSize >= inSize)
{
copiedInSize = inSize;
break;
}
compressed.resize(inSize);
}
chunkSize = chunkSize < (inSize - copiedInSize) ? chunkSize : (inSize - copiedInSize);
std::copy(chunk, chunk + chunkSize, compressed.data() + copiedInSize);
copiedInSize += chunkSize;
if (copiedInSize == inSize)
{
chunk = compressed.data();
break;
}
}
// Put back unused bytes
if (in.ByteCount() > (currentBytes + copiedInSize))
in.BackUp(in.ByteCount() - currentBytes - copiedInSize);
if ((copiedInSize == 0 && chunkSize < inSize) || (copiedInSize > 0 && copiedInSize != inSize))
doThrow("lz4 decompress: insufficient input size");
return lz4Decompress(chunk, inSize, decompressed, decompressedSize);
}
} // compression
} // ripple
#endif //RIPPLED_COMPRESSIONALGORITHMS_H_INCLUDED

View File

@@ -171,6 +171,9 @@ public:
std::string SSL_VERIFY_FILE; std::string SSL_VERIFY_FILE;
std::string SSL_VERIFY_DIR; std::string SSL_VERIFY_DIR;
// Compression
bool COMPRESSION = false;
// Thread pool configuration // Thread pool configuration
std::size_t WORKERS = 0; std::size_t WORKERS = 0;

View File

@@ -37,6 +37,7 @@ struct ConfigSection
// VFALCO TODO Rename and replace these macros with variables. // VFALCO TODO Rename and replace these macros with variables.
#define SECTION_AMENDMENTS "amendments" #define SECTION_AMENDMENTS "amendments"
#define SECTION_CLUSTER_NODES "cluster_nodes" #define SECTION_CLUSTER_NODES "cluster_nodes"
#define SECTION_COMPRESSION "compression"
#define SECTION_DEBUG_LOGFILE "debug_logfile" #define SECTION_DEBUG_LOGFILE "debug_logfile"
#define SECTION_ELB_SUPPORT "elb_support" #define SECTION_ELB_SUPPORT "elb_support"
#define SECTION_FEE_DEFAULT "fee_default" #define SECTION_FEE_DEFAULT "fee_default"

View File

@@ -454,6 +454,9 @@ void Config::loadFromString (std::string const& fileContents)
if (getSingleSection (secConfig, SECTION_WORKERS, strTemp, j_)) if (getSingleSection (secConfig, SECTION_WORKERS, strTemp, j_))
WORKERS = beast::lexicalCastThrow <std::size_t> (strTemp); WORKERS = beast::lexicalCastThrow <std::size_t> (strTemp);
if (getSingleSection (secConfig, SECTION_COMPRESSION, strTemp, j_))
COMPRESSION = beast::lexicalCastThrow <bool> (strTemp);
// Do not load trusted validator configuration for standalone mode // Do not load trusted validator configuration for standalone mode
if (! RUN_STANDALONE) if (! RUN_STANDALONE)
{ {

View File

@@ -0,0 +1,103 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLED_COMPRESSION_H_INCLUDED
#define RIPPLED_COMPRESSION_H_INCLUDED
#include <ripple/basics/CompressionAlgorithms.h>
#include <ripple/basics/Log.h>
#include <lz4frame.h>
namespace ripple {
namespace compression {
std::size_t constexpr headerBytes = 6;
std::size_t constexpr headerBytesCompressed = 10;
enum class Algorithm : std::uint8_t {
None = 0x00,
LZ4 = 0x01
};
enum class Compressed : std::uint8_t {
On,
Off
};
/** Decompress input stream.
* @tparam InputStream ZeroCopyInputStream
* @param in Input source stream
* @param inSize Size of compressed data
* @param decompressed Buffer to hold decompressed message
* @param algorithm Compression algorithm type
* @return Size of decompressed data or zero if failed to decompress
*/
template<typename InputStream>
std::size_t
decompress(InputStream& in, std::size_t inSize, std::uint8_t* decompressed,
std::size_t decompressedSize, Algorithm algorithm = Algorithm::LZ4) {
try
{
if (algorithm == Algorithm::LZ4)
return ripple::compression_algorithms::lz4Decompress(in, inSize,
decompressed, decompressedSize);
else
{
JLOG(debugLog().warn()) << "decompress: invalid compression algorithm "
<< static_cast<int>(algorithm);
assert(0);
}
}
catch (...) {}
return 0;
}
/** Compress input data.
* @tparam BufferFactory Callable object or lambda.
* Takes the requested buffer size and returns allocated buffer pointer.
* @param in Data to compress
* @param inSize Size of the data
* @param bf Compressed buffer allocator
* @param algorithm Compression algorithm type
* @return Size of compressed data, or zero if failed to compress
*/
template<class BufferFactory>
std::size_t
compress(void const* in,
std::size_t inSize, BufferFactory&& bf, Algorithm algorithm = Algorithm::LZ4) {
try
{
if (algorithm == Algorithm::LZ4)
return ripple::compression_algorithms::lz4Compress(in, inSize, std::forward<BufferFactory>(bf));
else
{
JLOG(debugLog().warn()) << "compress: invalid compression algorithm"
<< static_cast<int>(algorithm);
assert(0);
}
}
catch (...) {}
return 0;
}
} // compression
} // ripple
#endif //RIPPLED_COMPRESSION_H_INCLUDED

View File

@@ -20,6 +20,7 @@
#ifndef RIPPLE_OVERLAY_MESSAGE_H_INCLUDED #ifndef RIPPLE_OVERLAY_MESSAGE_H_INCLUDED
#define RIPPLE_OVERLAY_MESSAGE_H_INCLUDED #define RIPPLE_OVERLAY_MESSAGE_H_INCLUDED
#include <ripple/overlay/Compression.h>
#include <ripple/protocol/messages.h> #include <ripple/protocol/messages.h>
#include <boost/asio/buffer.hpp> #include <boost/asio/buffer.hpp>
#include <boost/asio/buffers_iterator.hpp> #include <boost/asio/buffers_iterator.hpp>
@@ -47,27 +48,61 @@ namespace ripple {
class Message : public std::enable_shared_from_this <Message> class Message : public std::enable_shared_from_this <Message>
{ {
using Compressed = compression::Compressed;
using Algorithm = compression::Algorithm;
public: public:
/** Constructor
* @param message Protocol message to serialize
* @param type Protocol message type
*/
Message (::google::protobuf::Message const& message, int type); Message (::google::protobuf::Message const& message, int type);
public: /** Retrieve the packed message data. If compressed message is requested but the message
/** Retrieve the packed message data. */ * is not compressible then the uncompressed buffer is returned.
* @param compressed Request compressed (Compress::On) or
* uncompressed (Compress::Off) payload buffer
* @return Payload buffer
*/
std::vector <uint8_t> const& std::vector <uint8_t> const&
getBuffer () const getBuffer (Compressed tryCompressed);
{
return mBuffer;
}
/** Get the traffic category */ /** Get the traffic category */
std::size_t std::size_t
getCategory () const getCategory () const
{ {
return mCategory; return category_;
} }
private: private:
std::vector <uint8_t> mBuffer; std::vector <uint8_t> buffer_;
std::size_t mCategory; std::vector <uint8_t> bufferCompressed_;
std::size_t category_;
std::once_flag once_flag_;
/** Set the payload header
* @param in Pointer to the payload
* @param payloadBytes Size of the payload excluding the header size
* @param type Protocol message type
* @param comprAlgorithm Compression algorithm used in compression,
* currently LZ4 only. If None then the message is uncompressed.
* @param uncompressedBytes Size of the uncompressed message
*/
void setHeader(std::uint8_t* in, std::uint32_t payloadBytes, int type,
Algorithm comprAlgorithm, std::uint32_t uncompressedBytes);
/** Try to compress the payload.
* Can be called concurrently by multiple peers but is compressed once.
* If the message is not compressible then the serialized buffer_ is used.
*/
void compress();
/** Get the message type from the payload header.
* First four bytes are the compression/algorithm flag and the payload size.
* Next two bytes are the message type
* @param in Payload header pointer
* @return Message type
*/
int getType(std::uint8_t const* in) const;
}; };
} }

View File

@@ -197,7 +197,7 @@ ConnectAttempt::onHandshake (error_code ec)
if (! sharedValue) if (! sharedValue)
return close(); // makeSharedValue logs return close(); // makeSharedValue logs
req_ = makeRequest(!overlay_.peerFinder().config().peerPrivate); req_ = makeRequest(!overlay_.peerFinder().config().peerPrivate, app_.config().COMPRESSION);
buildHandshake(req_, *sharedValue, overlay_.setup().networkID, buildHandshake(req_, *sharedValue, overlay_.setup().networkID,
overlay_.setup().public_ip, remote_endpoint_.address(), app_); overlay_.setup().public_ip, remote_endpoint_.address(), app_);
@@ -264,7 +264,7 @@ ConnectAttempt::onShutdown (error_code ec)
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
auto auto
ConnectAttempt::makeRequest (bool crawl) -> request_type ConnectAttempt::makeRequest (bool crawl, bool compressionEnabled) -> request_type
{ {
request_type m; request_type m;
m.method(boost::beast::http::verb::get); m.method(boost::beast::http::verb::get);
@@ -275,6 +275,8 @@ ConnectAttempt::makeRequest (bool crawl) -> request_type
m.insert ("Connection", "Upgrade"); m.insert ("Connection", "Upgrade");
m.insert ("Connect-As", "Peer"); m.insert ("Connect-As", "Peer");
m.insert ("Crawl", crawl ? "public" : "private"); m.insert ("Crawl", crawl ? "public" : "private");
if (compressionEnabled)
m.insert("X-Offer-Compression", "lz4");
return m; return m;
} }

View File

@@ -93,7 +93,7 @@ private:
static static
request_type request_type
makeRequest (bool crawl); makeRequest (bool crawl, bool compressionEnabled);
void processResponse(); void processResponse();

View File

@@ -17,7 +17,6 @@
*/ */
//============================================================================== //==============================================================================
#include <ripple/basics/safe_cast.h>
#include <ripple/overlay/Message.h> #include <ripple/overlay/Message.h>
#include <ripple/overlay/impl/TrafficCount.h> #include <ripple/overlay/impl/TrafficCount.h>
#include <cstdint> #include <cstdint>
@@ -25,8 +24,9 @@
namespace ripple { namespace ripple {
Message::Message (::google::protobuf::Message const& message, int type) Message::Message (::google::protobuf::Message const& message, int type)
: mCategory(TrafficCount::categorize(message, type, false)) : category_(TrafficCount::categorize(message, type, false))
{ {
using namespace ripple::compression;
#if defined(GOOGLE_PROTOBUF_VERSION) && (GOOGLE_PROTOBUF_VERSION >= 3011000) #if defined(GOOGLE_PROTOBUF_VERSION) && (GOOGLE_PROTOBUF_VERSION >= 3011000)
auto const messageBytes = message.ByteSizeLong (); auto const messageBytes = message.ByteSizeLong ();
@@ -36,23 +36,129 @@ Message::Message (::google::protobuf::Message const& message, int type)
assert (messageBytes != 0); assert (messageBytes != 0);
/** Number of bytes in a message header. */ buffer_.resize (headerBytes + messageBytes);
std::size_t constexpr headerBytes = 6;
mBuffer.resize (headerBytes + messageBytes); setHeader(buffer_.data(), messageBytes, type, Algorithm::None, 0);
auto ptr = mBuffer.data();
*ptr++ = static_cast<std::uint8_t>((messageBytes >> 24) & 0xFF);
*ptr++ = static_cast<std::uint8_t>((messageBytes >> 16) & 0xFF);
*ptr++ = static_cast<std::uint8_t>((messageBytes >> 8) & 0xFF);
*ptr++ = static_cast<std::uint8_t>(messageBytes & 0xFF);
*ptr++ = static_cast<std::uint8_t>((type >> 8) & 0xFF);
*ptr++ = static_cast<std::uint8_t> (type & 0xFF);
if (messageBytes != 0) if (messageBytes != 0)
message.SerializeToArray(ptr, messageBytes); message.SerializeToArray(buffer_.data() + headerBytes, messageBytes);
}
void
Message::compress()
{
using namespace ripple::compression;
auto const messageBytes = buffer_.size () - headerBytes;
auto type = getType(buffer_.data());
bool const compressible = [&]{
if (messageBytes <= 70)
return false;
switch(type)
{
case protocol::mtMANIFESTS:
case protocol::mtENDPOINTS:
case protocol::mtTRANSACTION:
case protocol::mtGET_LEDGER:
case protocol::mtLEDGER_DATA:
case protocol::mtGET_OBJECTS:
case protocol::mtVALIDATORLIST:
return true;
case protocol::mtPING:
case protocol::mtCLUSTER:
case protocol::mtPROPOSE_LEDGER:
case protocol::mtSTATUS_CHANGE:
case protocol::mtHAVE_SET:
case protocol::mtVALIDATION:
case protocol::mtGET_SHARD_INFO:
case protocol::mtSHARD_INFO:
case protocol::mtGET_PEER_SHARD_INFO:
case protocol::mtPEER_SHARD_INFO:
break;
}
return false;
}();
if (compressible)
{
auto payload = static_cast<void const*>(buffer_.data() + headerBytes);
auto compressedSize = ripple::compression::compress(
payload,
messageBytes,
[&](std::size_t inSize) { // size of required compressed buffer
bufferCompressed_.resize(inSize + headerBytesCompressed);
return (bufferCompressed_.data() + headerBytesCompressed);
});
if (compressedSize < (messageBytes - (headerBytesCompressed - headerBytes)))
{
bufferCompressed_.resize(headerBytesCompressed + compressedSize);
setHeader(bufferCompressed_.data(), compressedSize, type, Algorithm::LZ4, messageBytes);
}
else
bufferCompressed_.resize(0);
}
}
/** Set payload header
* Uncompressed message header
* 47-42 Set to 0
* 41-16 Payload size
* 15-0 Message Type
* Compressed message header
* 79 Set to 0, indicates the message is compressed
* 78-76 Compression algorithm, value 1-7. Set to 1 to indicate LZ4 compression
* 75-74 Set to 0
* 73-48 Payload size
* 47-32 Message Type
* 31-0 Uncompressed message size
*/
void
Message::setHeader(std::uint8_t* in, std::uint32_t payloadBytes, int type,
Algorithm comprAlgorithm, std::uint32_t uncompressedBytes)
{
auto h = in;
auto pack = [](std::uint8_t*& in, std::uint32_t size) {
*in++ = static_cast<std::uint8_t>((size >> 24) & 0x0F); // leftmost 4 are compression bits
*in++ = static_cast<std::uint8_t>((size >> 16) & 0xFF);
*in++ = static_cast<std::uint8_t>((size >> 8) & 0xFF);
*in++ = static_cast<std::uint8_t>(size & 0xFF);
};
pack(in, payloadBytes);
*in++ = static_cast<std::uint8_t>((type >> 8) & 0xFF);
*in++ = static_cast<std::uint8_t> (type & 0xFF);
if (comprAlgorithm != Algorithm::None)
{
pack(in, uncompressedBytes);
*h |= 0x80 | (static_cast<uint8_t>(comprAlgorithm) << 4);
}
}
std::vector <uint8_t> const&
Message::getBuffer (Compressed tryCompressed)
{
if (tryCompressed == Compressed::Off)
return buffer_;
std::call_once(once_flag_, &Message::compress, this);
if (bufferCompressed_.size() > 0)
return bufferCompressed_;
else
return buffer_;
}
int
Message::getType(std::uint8_t const* in) const
{
int type = (static_cast<int>(*(in + 4)) << 8) + *(in + 5);
return type;
} }
} }

View File

@@ -86,6 +86,7 @@ PeerImp::PeerImp (Application& app, id_t id,
, slot_ (slot) , slot_ (slot)
, request_(std::move(request)) , request_(std::move(request))
, headers_(request_) , headers_(request_)
, compressionEnabled_(headers_["X-Offer-Compression"] == "lz4" ? Compressed::On : Compressed::Off)
{ {
} }
@@ -219,7 +220,7 @@ PeerImp::send (std::shared_ptr<Message> const& m)
overlay_.reportTraffic ( overlay_.reportTraffic (
safe_cast<TrafficCount::category>(m->getCategory()), safe_cast<TrafficCount::category>(m->getCategory()),
false, static_cast<int>(m->getBuffer().size())); false, static_cast<int>(m->getBuffer(compressionEnabled_).size()));
auto sendq_size = send_queue_.size(); auto sendq_size = send_queue_.size();
@@ -246,7 +247,7 @@ PeerImp::send (std::shared_ptr<Message> const& m)
boost::asio::async_write( boost::asio::async_write(
stream_, stream_,
boost::asio::buffer(send_queue_.front()->getBuffer()), boost::asio::buffer(send_queue_.front()->getBuffer(compressionEnabled_)),
bind_executor( bind_executor(
strand_, strand_,
std::bind( std::bind(
@@ -757,6 +758,8 @@ PeerImp::makeResponse (bool crawl,
resp.insert("Connect-As", "Peer"); resp.insert("Connect-As", "Peer");
resp.insert("Server", BuildInfo::getFullVersionString()); resp.insert("Server", BuildInfo::getFullVersionString());
resp.insert("Crawl", crawl ? "public" : "private"); resp.insert("Crawl", crawl ? "public" : "private");
if (req["X-Offer-Compression"] == "lz4" && app_.config().COMPRESSION)
resp.insert("X-Offer-Compression", "lz4");
buildHandshake(resp, sharedValue, overlay_.setup().networkID, buildHandshake(resp, sharedValue, overlay_.setup().networkID,
overlay_.setup().public_ip, remote_ip, app_); overlay_.setup().public_ip, remote_ip, app_);
@@ -945,7 +948,7 @@ PeerImp::onWriteMessage (error_code ec, std::size_t bytes_transferred)
// Timeout on writes only // Timeout on writes only
return boost::asio::async_write( return boost::asio::async_write(
stream_, stream_,
boost::asio::buffer(send_queue_.front()->getBuffer()), boost::asio::buffer(send_queue_.front()->getBuffer(compressionEnabled_)),
bind_executor( bind_executor(
strand_, strand_,
std::bind( std::bind(

View File

@@ -99,6 +99,7 @@ private:
using address_type = boost::asio::ip::address; using address_type = boost::asio::ip::address;
using endpoint_type = boost::asio::ip::tcp::endpoint; using endpoint_type = boost::asio::ip::tcp::endpoint;
using waitable_timer = boost::asio::basic_waitable_timer<std::chrono::steady_clock>; using waitable_timer = boost::asio::basic_waitable_timer<std::chrono::steady_clock>;
using Compressed = compression::Compressed;
Application& app_; Application& app_;
id_t const id_; id_t const id_;
@@ -201,6 +202,8 @@ private:
std::mutex mutable shardInfoMutex_; std::mutex mutable shardInfoMutex_;
hash_map<PublicKey, ShardInfo> shardInfo_; hash_map<PublicKey, ShardInfo> shardInfo_;
Compressed compressionEnabled_ = Compressed::Off;
friend class OverlayImpl; friend class OverlayImpl;
class Metrics { class Metrics {
@@ -600,6 +603,9 @@ PeerImp::PeerImp (Application& app, std::unique_ptr<stream_type>&& stream_ptr,
, slot_ (std::move(slot)) , slot_ (std::move(slot))
, response_(std::move(response)) , response_(std::move(response))
, headers_(response_) , headers_(response_)
, compressionEnabled_(
headers_["X-Offer-Compression"] == "lz4" && app_.config().COMPRESSION
? Compressed::On : Compressed::Off)
{ {
read_buffer_.commit (boost::asio::buffer_copy(read_buffer_.prepare( read_buffer_.commit (boost::asio::buffer_copy(read_buffer_.prepare(
boost::asio::buffer_size(buffers)), buffers)); boost::asio::buffer_size(buffers)), buffers));

View File

@@ -22,6 +22,7 @@
#include <ripple/basics/ByteUtilities.h> #include <ripple/basics/ByteUtilities.h>
#include <ripple/protocol/messages.h> #include <ripple/protocol/messages.h>
#include <ripple/overlay/Compression.h>
#include <ripple/overlay/Message.h> #include <ripple/overlay/Message.h>
#include <ripple/overlay/impl/ZeroCopyStream.h> #include <ripple/overlay/impl/ZeroCopyStream.h>
#include <boost/asio/buffer.hpp> #include <boost/asio/buffer.hpp>
@@ -81,36 +82,66 @@ struct MessageHeader
/** The size of the payload on the wire. */ /** The size of the payload on the wire. */
std::uint32_t payload_wire_size = 0; std::uint32_t payload_wire_size = 0;
/** Uncompressed message size if the message is compressed. */
std::uint32_t uncompressed_size = 0;
/** The type of the message. */ /** The type of the message. */
std::uint16_t message_type = 0; std::uint16_t message_type = 0;
/** Indicates which compression algorithm the payload is compressed with.
* Currenly only lz4 is supported. If None then the message is not compressed.
*/
compression::Algorithm algorithm = compression::Algorithm::None;
}; };
template<typename BufferSequence>
auto
buffersBegin(BufferSequence const &bufs)
{
return boost::asio::buffers_iterator<BufferSequence, std::uint8_t>::begin(bufs);
}
template <class BufferSequence> template <class BufferSequence>
boost::optional<MessageHeader> parseMessageHeader( boost::optional<MessageHeader> parseMessageHeader(
BufferSequence const& bufs, BufferSequence const& bufs,
std::size_t size) std::size_t size)
{ {
auto iter = boost::asio::buffers_iterator<BufferSequence, std::uint8_t>::begin(bufs); using namespace ripple::compression;
auto iter = buffersBegin(bufs);
MessageHeader hdr; MessageHeader hdr;
auto const compressed = (*iter & 0x80) == 0x80;
// Version 1 header: uncompressed payload. // Check valid header
// The top six bits of the first byte are 0. if ((*iter & 0xFC) == 0 || compressed)
if ((*iter & 0xFC) == 0)
{ {
hdr.header_size = 6; hdr.header_size = compressed ? headerBytesCompressed : headerBytes;
if (size < hdr.header_size) if (size < hdr.header_size)
return {}; return {};
if (compressed)
{
uint8_t algorithm = (*iter & 0x70) >> 4;
if (algorithm != static_cast<std::uint8_t>(compression::Algorithm::LZ4))
return {};
hdr.algorithm = compression::Algorithm::LZ4;
}
for (int i = 0; i != 4; ++i) for (int i = 0; i != 4; ++i)
hdr.payload_wire_size = (hdr.payload_wire_size << 8) + *iter++; hdr.payload_wire_size = (hdr.payload_wire_size << 8) + *iter++;
// clear the compression bits
hdr.payload_wire_size &= 0x03FFFFFF;
hdr.total_wire_size = hdr.header_size + hdr.payload_wire_size; hdr.total_wire_size = hdr.header_size + hdr.payload_wire_size;
for (int i = 0; i != 2; ++i) for (int i = 0; i != 2; ++i)
hdr.message_type = (hdr.message_type << 8) + *iter++; hdr.message_type = (hdr.message_type << 8) + *iter++;
if (compressed)
for (int i = 0; i != 4; ++i)
hdr.uncompressed_size = (hdr.uncompressed_size << 8) + *iter++;
return hdr; return hdr;
} }
@@ -130,7 +161,22 @@ invoke (
ZeroCopyInputStream<Buffers> stream(buffers); ZeroCopyInputStream<Buffers> stream(buffers);
stream.Skip(header.header_size); stream.Skip(header.header_size);
if (! m->ParseFromZeroCopyStream(&stream)) if (header.algorithm != compression::Algorithm::None)
{
std::vector<std::uint8_t> payload;
payload.resize(header.uncompressed_size);
auto payloadSize = ripple::compression::decompress(
stream,
header.payload_wire_size,
payload.data(),
header.uncompressed_size,
header.algorithm);
if (payloadSize == 0 || !m->ParseFromArray(payload.data(), payloadSize))
return false;
}
else if (!m->ParseFromZeroCopyStream(&stream))
return false; return false;
handler.onMessageBegin (header.message_type, m, header.payload_wire_size); handler.onMessageBegin (header.message_type, m, header.payload_wire_size);

View File

@@ -0,0 +1,376 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/app/misc/Manifest.h>
#include <ripple/app/ledger/Ledger.h>
#include <ripple/app/ledger/LedgerMaster.h>
#include <ripple/beast/unit_test.h>
#include <ripple/beast/utility/Journal.h>
#include <ripple/core/TimeKeeper.h>
#include <ripple/overlay/Compression.h>
#include <ripple/overlay/impl/ZeroCopyStream.h>
#include <ripple/overlay/impl/ProtocolMessage.h>
#include <ripple/overlay/Message.h>
#include <ripple/protocol/digest.h>
#include <ripple/protocol/HashPrefix.h>
#include <ripple/protocol/jss.h>
#include <ripple/protocol/PublicKey.h>
#include <ripple/protocol/SecretKey.h>
#include <ripple/protocol/Sign.h>
#include <ripple/shamap/SHAMapNodeID.h>
#include <ripple.pb.h>
#include <boost/asio/ip/address_v4.hpp>
#include <boost/beast/core/multi_buffer.hpp>
#include <boost/endian/conversion.hpp>
#include <test/jtx/Account.h>
#include <test/jtx/amount.h>
#include <test/jtx/Env.h>
#include <test/jtx/pay.h>
#include <test/jtx/WSClient.h>
#include <algorithm>
namespace ripple {
namespace test {
using namespace ripple::test;
using namespace ripple::test::jtx;
static
uint256
ledgerHash (LedgerInfo const& info)
{
return ripple::sha512Half(
HashPrefix::ledgerMaster,
std::uint32_t(info.seq),
std::uint64_t(info.drops.drops ()),
info.parentHash,
info.txHash,
info.accountHash,
std::uint32_t(info.parentCloseTime.time_since_epoch().count()),
std::uint32_t(info.closeTime.time_since_epoch().count()),
std::uint8_t(info.closeTimeResolution.count()),
std::uint8_t(info.closeFlags));
}
class compression_test : public beast::unit_test::suite {
using Compressed = compression::Compressed;
using Algorithm = compression::Algorithm;
public:
compression_test() {}
template<typename T>
void
doTest(std::shared_ptr<T> proto, protocol::MessageType mt, uint16_t nbuffers, const char *msg,
bool log = false) {
if (log)
printf("=== compress/decompress %s ===\n", msg);
Message m(*proto, mt);
auto &buffer = m.getBuffer(Compressed::On);
if (log)
printf("==> compressed, original %d bytes, compressed %d bytes\n",
(int)m.getBuffer(Compressed::Off).size(),
(int)m.getBuffer(Compressed::On).size());
boost::beast::multi_buffer buffers;
// simulate multi-buffer
auto sz = buffer.size() / nbuffers;
for (int i = 0; i < nbuffers; i++) {
auto start = buffer.begin() + sz * i;
auto end = i < nbuffers - 1 ? (buffer.begin() + sz * (i + 1)) : buffer.end();
std::vector<std::uint8_t> slice(start, end);
buffers.commit(
boost::asio::buffer_copy(buffers.prepare(slice.size()), boost::asio::buffer(slice)));
}
auto header = ripple::detail::parseMessageHeader(buffers.data(), buffer.size());
if (log)
printf("==> parsed header: buffers size %d, compressed %d, algorithm %d, header size %d, payload size %d, buffer size %d\n",
(int)buffers.size(), header->algorithm != Algorithm::None, (int)header->algorithm,
(int)header->header_size, (int)header->payload_wire_size, (int)buffer.size());
if (header->algorithm == Algorithm::None) {
if (log)
printf("==> NOT COMPRESSED\n");
return;
}
std::vector<std::uint8_t> decompressed;
decompressed.resize(header->uncompressed_size);
BEAST_EXPECT(header->payload_wire_size == buffer.size() - header->header_size);
ZeroCopyInputStream stream(buffers.data());
stream.Skip(header->header_size);
auto decompressedSize = ripple::compression::decompress(stream, header->payload_wire_size,
decompressed.data(), header->uncompressed_size);
BEAST_EXPECT(decompressedSize == header->uncompressed_size);
auto const proto1 = std::make_shared<T>();
BEAST_EXPECT(proto1->ParseFromArray(decompressed.data(), decompressedSize));
auto uncompressed = m.getBuffer(Compressed::Off);
BEAST_EXPECT(std::equal(uncompressed.begin() + ripple::compression::headerBytes,
uncompressed.end(),
decompressed.begin()));
if (log)
printf("\n");
}
std::shared_ptr<protocol::TMManifests>
buildManifests(int n) {
auto manifests = std::make_shared<protocol::TMManifests>();
manifests->mutable_list()->Reserve(n);
for (int i = 0; i < n; i++) {
auto master = randomKeyPair(KeyType::ed25519);
auto signing = randomKeyPair(KeyType::ed25519);
STObject st(sfGeneric);
st[sfSequence] = i;
st[sfPublicKey] = std::get<0>(master);
st[sfSigningPubKey] = std::get<0>(signing);
st[sfDomain] = makeSlice(std::string("example") + std::to_string(i) + std::string(".com"));
sign(st, HashPrefix::manifest, KeyType::ed25519, std::get<1>(master), sfMasterSignature);
sign(st, HashPrefix::manifest, KeyType::ed25519, std::get<1>(signing));
Serializer s;
st.add(s);
auto *manifest = manifests->add_list();
manifest->set_stobject(s.data(), s.size());
}
return manifests;
}
std::shared_ptr<protocol::TMEndpoints>
buildEndpoints(int n) {
auto endpoints = std::make_shared<protocol::TMEndpoints>();
endpoints->mutable_endpoints()->Reserve(n);
for (int i = 0; i < n; i++) {
auto *endpoint = endpoints->add_endpoints();
endpoint->set_hops(i);
std::string addr = std::string("10.0.1.") + std::to_string(i);
endpoint->mutable_ipv4()->set_ipv4(
boost::endian::native_to_big(boost::asio::ip::address_v4::from_string(addr).to_uint()));
endpoint->mutable_ipv4()->set_ipv4port(i);
}
endpoints->set_version(2);
return endpoints;
}
std::shared_ptr<protocol::TMTransaction>
buildTransaction(Logs &logs) {
Env env(*this, envconfig());
int fund = 10000;
auto const alice = Account("alice");
auto const bob = Account("bob");
env.fund(XRP(fund), "alice", "bob");
env.trust(bob["USD"](fund), alice);
env.close();
auto toBinary = [](std::string const &text) {
std::string binary;
for (size_t i = 0; i < text.size(); ++i) {
unsigned int c = charUnHex(text[i]);
c = c << 4;
++i;
c = c | charUnHex(text[i]);
binary.push_back(c);
}
return binary;
};
std::string usdTxBlob = "";
auto wsc = makeWSClient(env.app().config());
{
Json::Value jrequestUsd;
jrequestUsd[jss::secret] = toBase58(generateSeed("bob"));
jrequestUsd[jss::tx_json] =
pay("bob", "alice", bob["USD"](fund / 2));
Json::Value jreply_usd = wsc->invoke("sign", jrequestUsd);
usdTxBlob =
toBinary(jreply_usd[jss::result][jss::tx_blob].asString());
}
auto transaction = std::make_shared<protocol::TMTransaction>();
transaction->set_rawtransaction(usdTxBlob);
transaction->set_status(protocol::tsNEW);
auto tk = make_TimeKeeper(logs.journal("TimeKeeper"));
transaction->set_receivetimestamp(tk->now().time_since_epoch().count());
transaction->set_deferred(true);
return transaction;
}
std::shared_ptr<protocol::TMGetLedger>
buildGetLedger() {
auto getLedger = std::make_shared<protocol::TMGetLedger>();
getLedger->set_itype(protocol::liTS_CANDIDATE);
getLedger->set_ltype(protocol::TMLedgerType::ltACCEPTED);
uint256 const hash(ripple::sha512Half(123456789));
getLedger->set_ledgerhash(hash.begin(), hash.size());
getLedger->set_ledgerseq(123456789);
ripple::SHAMapNodeID sha(hash.data(), hash.size());
getLedger->add_nodeids(sha.getRawString());
getLedger->set_requestcookie(123456789);
getLedger->set_querytype(protocol::qtINDIRECT);
getLedger->set_querydepth(3);
return getLedger;
}
std::shared_ptr<protocol::TMLedgerData>
buildLedgerData(uint32_t n, Logs &logs) {
auto ledgerData = std::make_shared<protocol::TMLedgerData>();
uint256 const hash(ripple::sha512Half(12356789));
ledgerData->set_ledgerhash(hash.data(), hash.size());
ledgerData->set_ledgerseq(123456789);
ledgerData->set_type(protocol::TMLedgerInfoType::liAS_NODE);
ledgerData->set_requestcookie(123456789);
ledgerData->set_error(protocol::TMReplyError::reNO_LEDGER);
ledgerData->mutable_nodes()->Reserve(n);
uint256 parentHash(0);
for (int i = 0; i < n; i++) {
LedgerInfo info;
auto tk = make_TimeKeeper(logs.journal("TimeKeeper"));
info.seq = i;
info.parentCloseTime = tk->now();
info.hash = ripple::sha512Half(i);
info.txHash = ripple::sha512Half(i + 1);
info.accountHash = ripple::sha512Half(i + 2);
info.parentHash = parentHash;
info.drops = XRPAmount(10);
info.closeTimeResolution = tk->now().time_since_epoch();
info.closeTime = tk->now();
parentHash = ledgerHash(info);
Serializer nData;
ripple::addRaw(info, nData);
ledgerData->add_nodes()->set_nodedata(nData.getDataPtr(), nData.getLength());
}
return ledgerData;
}
std::shared_ptr<protocol::TMGetObjectByHash>
buildGetObjectByHash() {
auto getObject = std::make_shared<protocol::TMGetObjectByHash>();
getObject->set_type(protocol::TMGetObjectByHash_ObjectType::TMGetObjectByHash_ObjectType_otTRANSACTION);
getObject->set_query(true);
getObject->set_seq(123456789);
uint256 hash(ripple::sha512Half(123456789));
getObject->set_ledgerhash(hash.data(), hash.size());
getObject->set_fat(true);
for (int i = 0; i < 100; i++) {
uint256 hash(ripple::sha512Half(i));
auto object = getObject->add_objects();
object->set_hash(hash.data(), hash.size());
ripple::SHAMapNodeID sha(hash.data(), hash.size());
object->set_nodeid(sha.getRawString());
object->set_index("");
object->set_data("");
object->set_ledgerseq(i);
}
return getObject;
}
std::shared_ptr<protocol::TMValidatorList>
buildValidatorList()
{
auto list = std::make_shared<protocol::TMValidatorList>();
auto master = randomKeyPair(KeyType::ed25519);
auto signing = randomKeyPair(KeyType::ed25519);
STObject st(sfGeneric);
st[sfSequence] = 0;
st[sfPublicKey] = std::get<0>(master);
st[sfSigningPubKey] = std::get<0>(signing);
st[sfDomain] = makeSlice(std::string("example.com"));
sign(st, HashPrefix::manifest, KeyType::ed25519, std::get<1>(master), sfMasterSignature);
sign(st, HashPrefix::manifest, KeyType::ed25519, std::get<1>(signing));
Serializer s;
st.add(s);
list->set_manifest(s.data(), s.size());
list->set_version(3);
STObject signature(sfSignature);
ripple::sign(st, HashPrefix::manifest,KeyType::ed25519, std::get<1>(signing));
Serializer s1;
st.add(s1);
list->set_signature(s1.data(), s1.size());
list->set_blob(strHex(s.getString()));
return list;
}
void
testProtocol() {
testcase("Message Compression");
auto thresh = beast::severities::Severity::kInfo;
auto logs = std::make_unique<Logs>(thresh);
protocol::TMManifests manifests;
protocol::TMEndpoints endpoints;
protocol::TMTransaction transaction;
protocol::TMGetLedger get_ledger;
protocol::TMLedgerData ledger_data;
protocol::TMGetObjectByHash get_object;
protocol::TMValidatorList validator_list;
// 4.5KB
doTest(buildManifests(20), protocol::mtMANIFESTS, 4, "TMManifests20");
// 22KB
doTest(buildManifests(100), protocol::mtMANIFESTS, 4, "TMManifests100");
// 131B
doTest(buildEndpoints(10), protocol::mtENDPOINTS, 4, "TMEndpoints10");
// 1.3KB
doTest(buildEndpoints(100), protocol::mtENDPOINTS, 4, "TMEndpoints100");
// 242B
doTest(buildTransaction(*logs), protocol::mtTRANSACTION, 1, "TMTransaction");
// 87B
doTest(buildGetLedger(), protocol::mtGET_LEDGER, 1, "TMGetLedger");
// 61KB
doTest(buildLedgerData(500, *logs), protocol::mtLEDGER_DATA, 10, "TMLedgerData500");
// 122 KB
doTest(buildLedgerData(1000, *logs), protocol::mtLEDGER_DATA, 20, "TMLedgerData1000");
// 1.2MB
doTest(buildLedgerData(10000, *logs), protocol::mtLEDGER_DATA, 50, "TMLedgerData10000");
// 12MB
doTest(buildLedgerData(100000, *logs), protocol::mtLEDGER_DATA, 100, "TMLedgerData100000");
// 61MB
doTest(buildLedgerData(500000, *logs), protocol::mtLEDGER_DATA, 100, "TMLedgerData500000");
// 7.7KB
doTest(buildGetObjectByHash(), protocol::mtGET_OBJECTS, 4, "TMGetObjectByHash");
// 895B
doTest(buildValidatorList(), protocol::mtVALIDATORLIST, 4, "TMValidatorList");
}
void run() override {
testProtocol();
}
};
BEAST_DEFINE_TESTSUITE_MANUAL_PRIO(compression, ripple_data, ripple, 20);
}
}