rippled
Loading...
Searching...
No Matches
Message.cpp
1#include <xrpld/overlay/Message.h>
2#include <xrpld/overlay/detail/TrafficCount.h>
3
4#include <cstdint>
5
6namespace xrpl {
7
9 ::google::protobuf::Message const& message,
10 protocol::MessageType type,
11 std::optional<PublicKey> const& validator)
12 : category_(TrafficCount::categorize(message, type, false)), validatorKey_(validator)
13{
14 using namespace xrpl::compression;
15
16 auto const messageBytes = messageSize(message);
17
18 XRPL_ASSERT(messageBytes, "xrpl::Message::Message : non-empty message input");
19
20 buffer_.resize(headerBytes + messageBytes);
21
22 setHeader(buffer_.data(), messageBytes, type, Algorithm::None, 0);
23
24 if (messageBytes != 0)
25 message.SerializeToArray(buffer_.data() + headerBytes, messageBytes);
26
27 XRPL_ASSERT(getBufferSize() == totalSize(message), "xrpl::Message::Message : message size matches the buffer");
28}
29
30// static
32Message::messageSize(::google::protobuf::Message const& message)
33{
34#if defined(GOOGLE_PROTOBUF_VERSION) && (GOOGLE_PROTOBUF_VERSION >= 3011000)
35 return message.ByteSizeLong();
36#else
37 return message.ByteSize();
38#endif
39}
40
41// static
43Message::totalSize(::google::protobuf::Message const& message)
44{
45 return messageSize(message) + compression::headerBytes;
46}
47
48void
50{
51 using namespace xrpl::compression;
52 auto const messageBytes = buffer_.size() - headerBytes;
53
54 auto type = getType(buffer_.data());
55
56 bool const compressible = [&] {
57 if (messageBytes <= 70)
58 return false;
59 switch (type)
60 {
61 case protocol::mtMANIFESTS:
62 case protocol::mtENDPOINTS:
63 case protocol::mtTRANSACTION:
64 case protocol::mtGET_LEDGER:
65 case protocol::mtLEDGER_DATA:
66 case protocol::mtGET_OBJECTS:
67 case protocol::mtVALIDATOR_LIST:
68 case protocol::mtVALIDATOR_LIST_COLLECTION:
69 case protocol::mtREPLAY_DELTA_RESPONSE:
70 case protocol::mtTRANSACTIONS:
71 return true;
72 case protocol::mtPING:
73 case protocol::mtCLUSTER:
74 case protocol::mtPROPOSE_LEDGER:
75 case protocol::mtSTATUS_CHANGE:
76 case protocol::mtHAVE_SET:
77 case protocol::mtVALIDATION:
78 case protocol::mtPROOF_PATH_REQ:
79 case protocol::mtPROOF_PATH_RESPONSE:
80 case protocol::mtREPLAY_DELTA_REQ:
81 case protocol::mtHAVE_TRANSACTIONS:
82 break;
83 }
84 return false;
85 }();
86
87 if (compressible)
88 {
89 auto payload = static_cast<void const*>(buffer_.data() + headerBytes);
90
91 auto compressedSize = xrpl::compression::compress(
92 payload,
93 messageBytes,
94 [&](std::size_t inSize) { // size of required compressed buffer
95 bufferCompressed_.resize(inSize + headerBytesCompressed);
96 return (bufferCompressed_.data() + headerBytesCompressed);
97 });
98
99 if (compressedSize < (messageBytes - (headerBytesCompressed - headerBytes)))
100 {
101 bufferCompressed_.resize(headerBytesCompressed + compressedSize);
102 setHeader(bufferCompressed_.data(), compressedSize, type, Algorithm::LZ4, messageBytes);
103 }
104 else
106 }
107}
108
144void
147 std::uint32_t payloadBytes,
148 int type,
149 Algorithm compression,
150 std::uint32_t uncompressedBytes)
151{
152 auto h = in;
153
154 auto pack = [](std::uint8_t*& in, std::uint32_t size) {
155 *in++ = static_cast<std::uint8_t>((size >> 24) & 0x0F); // leftmost 4 are compression bits
156 *in++ = static_cast<std::uint8_t>((size >> 16) & 0xFF);
157 *in++ = static_cast<std::uint8_t>((size >> 8) & 0xFF);
158 *in++ = static_cast<std::uint8_t>(size & 0xFF);
159 };
160
161 pack(in, payloadBytes);
162
163 *in++ = static_cast<std::uint8_t>((type >> 8) & 0xFF);
164 *in++ = static_cast<std::uint8_t>(type & 0xFF);
165
166 if (compression != Algorithm::None)
167 {
168 pack(in, uncompressedBytes);
169 *h |= static_cast<std::uint8_t>(compression);
170 }
171}
172
175{
176 return buffer_.size();
177}
178
181{
182 if (tryCompressed == Compressed::Off)
183 return buffer_;
184
186
187 if (bufferCompressed_.size() > 0)
188 return bufferCompressed_;
189 else
190 return buffer_;
191}
192
193int
195{
196 int type = (static_cast<int>(*(in + 4)) << 8) + *(in + 5);
197 return type;
198}
199
200} // namespace xrpl
T call_once(T... args)
int getType(std::uint8_t const *in) const
Get the message type from the payload header.
Definition Message.cpp:194
Message(::google::protobuf::Message const &message, protocol::MessageType type, std::optional< PublicKey > const &validator={})
Constructor.
Definition Message.cpp:8
std::once_flag once_flag_
Definition Message.h:84
static std::size_t totalSize(::google::protobuf::Message const &message)
Definition Message.cpp:43
std::vector< uint8_t > const & getBuffer(Compressed tryCompressed)
Retrieve the packed message data.
Definition Message.cpp:180
std::vector< uint8_t > buffer_
Definition Message.h:81
void setHeader(std::uint8_t *in, std::uint32_t payloadBytes, int type, Algorithm compression, std::uint32_t uncompressedBytes)
Set the payload header.
Definition Message.cpp:145
void compress()
Try to compress the payload.
Definition Message.cpp:49
static std::size_t messageSize(::google::protobuf::Message const &message)
Definition Message.cpp:32
std::size_t getBufferSize()
Retrieve the size of the packed but uncompressed message data.
Definition Message.cpp:174
std::vector< uint8_t > bufferCompressed_
Definition Message.h:82
TrafficCount is used to count ingress and egress wire bytes and number of messages.
T data(T... args)
std::size_t compress(void const *in, std::size_t inSize, BufferFactory &&bf, Algorithm algorithm=Algorithm::LZ4)
Compress input data.
Definition Compression.h:68
std::size_t constexpr headerBytes
Definition Compression.h:11
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
T resize(T... args)
T size(T... args)