rippled
ProtocolMessage.h
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #ifndef RIPPLE_OVERLAY_PROTOCOLMESSAGE_H_INCLUDED
21 #define RIPPLE_OVERLAY_PROTOCOLMESSAGE_H_INCLUDED
22 
23 #include <ripple/basics/ByteUtilities.h>
24 #include <ripple/overlay/Compression.h>
25 #include <ripple/overlay/Message.h>
26 #include <ripple/overlay/impl/ZeroCopyStream.h>
27 #include <ripple/protocol/messages.h>
28 #include <boost/asio/buffer.hpp>
29 #include <boost/asio/buffers_iterator.hpp>
30 #include <boost/system/error_code.hpp>
31 #include <cassert>
32 #include <cstdint>
33 #include <memory>
34 #include <optional>
35 #include <type_traits>
36 #include <vector>
37 
38 namespace ripple {
39 
40 inline protocol::MessageType
41 protocolMessageType(protocol::TMGetLedger const&)
42 {
43  return protocol::mtGET_LEDGER;
44 }
45 
46 inline protocol::MessageType
47 protocolMessageType(protocol::TMReplayDeltaRequest const&)
48 {
49  return protocol::mtREPLAY_DELTA_REQ;
50 }
51 
52 inline protocol::MessageType
53 protocolMessageType(protocol::TMProofPathRequest const&)
54 {
55  return protocol::mtPROOF_PATH_REQ;
56 }
57 
59 template <class = void>
62 {
63  switch (type)
64  {
65  case protocol::mtMANIFESTS:
66  return "manifests";
67  case protocol::mtPING:
68  return "ping";
69  case protocol::mtCLUSTER:
70  return "cluster";
71  case protocol::mtENDPOINTS:
72  return "endpoints";
73  case protocol::mtTRANSACTION:
74  return "tx";
75  case protocol::mtGET_LEDGER:
76  return "get_ledger";
77  case protocol::mtLEDGER_DATA:
78  return "ledger_data";
79  case protocol::mtPROPOSE_LEDGER:
80  return "propose";
81  case protocol::mtSTATUS_CHANGE:
82  return "status";
83  case protocol::mtHAVE_SET:
84  return "have_set";
85  case protocol::mtVALIDATORLIST:
86  return "validator_list";
87  case protocol::mtVALIDATORLISTCOLLECTION:
88  return "validator_list_collection";
89  case protocol::mtVALIDATION:
90  return "validation";
91  case protocol::mtGET_PEER_SHARD_INFO:
92  return "get_peer_shard_info";
93  case protocol::mtPEER_SHARD_INFO:
94  return "peer_shard_info";
95  case protocol::mtGET_OBJECTS:
96  return "get_objects";
97  case protocol::mtHAVE_TRANSACTIONS:
98  return "have_transactions";
99  case protocol::mtTRANSACTIONS:
100  return "transactions";
101  case protocol::mtSQUELCH:
102  return "squelch";
103  case protocol::mtPROOF_PATH_REQ:
104  return "proof_path_request";
105  case protocol::mtPROOF_PATH_RESPONSE:
106  return "proof_path_response";
107  case protocol::mtREPLAY_DELTA_REQ:
108  return "replay_delta_request";
109  case protocol::mtREPLAY_DELTA_RESPONSE:
110  return "replay_delta_response";
111  case protocol::mtGET_PEER_SHARD_INFO_V2:
112  return "get_peer_shard_info_v2";
113  case protocol::mtPEER_SHARD_INFO_V2:
114  return "peer_shard_info_v2";
115  default:
116  break;
117  }
118  return "unknown";
119 }
120 
121 namespace detail {
122 
124 {
130 
133 
136 
139 
142 
148 };
149 
150 template <typename BufferSequence>
151 auto
152 buffersBegin(BufferSequence const& bufs)
153 {
154  return boost::asio::buffers_iterator<BufferSequence, std::uint8_t>::begin(
155  bufs);
156 }
157 
158 template <typename BufferSequence>
159 auto
160 buffersEnd(BufferSequence const& bufs)
161 {
162  return boost::asio::buffers_iterator<BufferSequence, std::uint8_t>::end(
163  bufs);
164 }
165 
175 template <class BufferSequence>
178  boost::system::error_code& ec,
179  BufferSequence const& bufs,
180  std::size_t size)
181 {
182  using namespace ripple::compression;
183 
184  MessageHeader hdr;
185  auto iter = buffersBegin(bufs);
186  assert(iter != buffersEnd(bufs));
187 
188  // Check valid header compressed message:
189  // - 4 bits are the compression algorithm, 1st bit is always set to 1
190  // - 2 bits are always set to 0
191  // - 26 bits are the payload size
192  // - 32 bits are the uncompressed data size
193  if (*iter & 0x80)
194  {
196 
197  // not enough bytes to parse the header
198  if (size < hdr.header_size)
199  {
200  ec = make_error_code(boost::system::errc::success);
201  return std::nullopt;
202  }
203 
204  if (*iter & 0x0C)
205  {
206  ec = make_error_code(boost::system::errc::protocol_error);
207  return std::nullopt;
208  }
209 
210  hdr.algorithm = static_cast<compression::Algorithm>(*iter & 0xF0);
211 
213  {
214  ec = make_error_code(boost::system::errc::protocol_error);
215  return std::nullopt;
216  }
217 
218  for (int i = 0; i != 4; ++i)
219  hdr.payload_wire_size = (hdr.payload_wire_size << 8) + *iter++;
220 
221  // clear the top four bits (the compression bits).
222  hdr.payload_wire_size &= 0x0FFFFFFF;
223 
225 
226  for (int i = 0; i != 2; ++i)
227  hdr.message_type = (hdr.message_type << 8) + *iter++;
228 
229  for (int i = 0; i != 4; ++i)
230  hdr.uncompressed_size = (hdr.uncompressed_size << 8) + *iter++;
231 
232  return hdr;
233  }
234 
235  // Check valid header uncompressed message:
236  // - 6 bits are set to 0
237  // - 26 bits are the payload size
238  if ((*iter & 0xFC) == 0)
239  {
240  hdr.header_size = headerBytes;
241 
242  if (size < hdr.header_size)
243  {
244  ec = make_error_code(boost::system::errc::success);
245  return std::nullopt;
246  }
247 
248  hdr.algorithm = Algorithm::None;
249 
250  for (int i = 0; i != 4; ++i)
251  hdr.payload_wire_size = (hdr.payload_wire_size << 8) + *iter++;
252 
255 
256  for (int i = 0; i != 2; ++i)
257  hdr.message_type = (hdr.message_type << 8) + *iter++;
258 
259  return hdr;
260  }
261 
262  ec = make_error_code(boost::system::errc::no_message);
263  return std::nullopt;
264 }
265 
266 template <
267  class T,
268  class Buffers,
269  class = std::enable_if_t<
272 parseMessageContent(MessageHeader const& header, Buffers const& buffers)
273 {
274  auto const m = std::make_shared<T>();
275 
276  ZeroCopyInputStream<Buffers> stream(buffers);
277  stream.Skip(header.header_size);
278 
280  {
282  payload.resize(header.uncompressed_size);
283 
284  auto const payloadSize = ripple::compression::decompress(
285  stream,
286  header.payload_wire_size,
287  payload.data(),
288  header.uncompressed_size,
289  header.algorithm);
290 
291  if (payloadSize == 0 || !m->ParseFromArray(payload.data(), payloadSize))
292  return {};
293  }
294  else if (!m->ParseFromZeroCopyStream(&stream))
295  return {};
296 
297  return m;
298 }
299 
300 template <
301  class T,
302  class Buffers,
303  class Handler,
304  class = std::enable_if_t<
306 bool
307 invoke(MessageHeader const& header, Buffers const& buffers, Handler& handler)
308 {
309  auto const m = parseMessageContent<T>(header, buffers);
310  if (!m)
311  return false;
312 
313  using namespace ripple::compression;
314  handler.onMessageBegin(
315  header.message_type,
316  m,
317  header.payload_wire_size,
318  header.uncompressed_size,
319  header.algorithm != Algorithm::None);
320  handler.onMessage(m);
321  handler.onMessageEnd(header.message_type, m);
322 
323  return true;
324 }
325 
326 } // namespace detail
327 
340 template <class Buffers, class Handler>
343  Buffers const& buffers,
344  Handler& handler,
345  std::size_t& hint)
346 {
348 
349  auto const size = boost::asio::buffer_size(buffers);
350 
351  if (size == 0)
352  return result;
353 
354  auto header = detail::parseMessageHeader(result.second, buffers, size);
355 
356  // If we can't parse the header then it may be that we don't have enough
357  // bytes yet, or because the message was cut off (if error_code is success).
358  // Otherwise we failed to match the header's marker (error_code is set to
359  // no_message) or the compression algorithm is invalid (error_code is
360  // protocol_error) and signal an error.
361  if (!header)
362  return result;
363 
364  // We implement a maximum size for protocol messages. Sending a message
365  // whose size exceeds this may result in the connection being dropped. A
366  // larger message size may be supported in the future or negotiated as
367  // part of a protocol upgrade.
368  if (header->payload_wire_size > maximiumMessageSize ||
369  header->uncompressed_size > maximiumMessageSize)
370  {
371  result.second = make_error_code(boost::system::errc::message_size);
372  return result;
373  }
374 
375  // We requested uncompressed messages from the peer but received compressed.
376  if (!handler.compressionEnabled() &&
377  header->algorithm != compression::Algorithm::None)
378  {
379  result.second = make_error_code(boost::system::errc::protocol_error);
380  return result;
381  }
382 
383  // We don't have the whole message yet. This isn't an error but we have
384  // nothing to do.
385  if (header->total_wire_size > size)
386  {
387  hint = header->total_wire_size - size;
388  return result;
389  }
390 
391  bool success;
392 
393  switch (header->message_type)
394  {
395  case protocol::mtMANIFESTS:
396  success = detail::invoke<protocol::TMManifests>(
397  *header, buffers, handler);
398  break;
399  case protocol::mtPING:
400  success =
401  detail::invoke<protocol::TMPing>(*header, buffers, handler);
402  break;
403  case protocol::mtCLUSTER:
404  success =
405  detail::invoke<protocol::TMCluster>(*header, buffers, handler);
406  break;
407  case protocol::mtENDPOINTS:
408  success = detail::invoke<protocol::TMEndpoints>(
409  *header, buffers, handler);
410  break;
411  case protocol::mtTRANSACTION:
412  success = detail::invoke<protocol::TMTransaction>(
413  *header, buffers, handler);
414  break;
415  case protocol::mtGET_LEDGER:
416  success = detail::invoke<protocol::TMGetLedger>(
417  *header, buffers, handler);
418  break;
419  case protocol::mtLEDGER_DATA:
420  success = detail::invoke<protocol::TMLedgerData>(
421  *header, buffers, handler);
422  break;
423  case protocol::mtPROPOSE_LEDGER:
424  success = detail::invoke<protocol::TMProposeSet>(
425  *header, buffers, handler);
426  break;
427  case protocol::mtSTATUS_CHANGE:
428  success = detail::invoke<protocol::TMStatusChange>(
429  *header, buffers, handler);
430  break;
431  case protocol::mtHAVE_SET:
432  success = detail::invoke<protocol::TMHaveTransactionSet>(
433  *header, buffers, handler);
434  break;
435  case protocol::mtVALIDATION:
436  success = detail::invoke<protocol::TMValidation>(
437  *header, buffers, handler);
438  break;
439  case protocol::mtGET_PEER_SHARD_INFO:
440  success = detail::invoke<protocol::TMGetPeerShardInfo>(
441  *header, buffers, handler);
442  break;
443  case protocol::mtPEER_SHARD_INFO:
444  success = detail::invoke<protocol::TMPeerShardInfo>(
445  *header, buffers, handler);
446  break;
447  case protocol::mtVALIDATORLIST:
448  success = detail::invoke<protocol::TMValidatorList>(
449  *header, buffers, handler);
450  break;
451  case protocol::mtVALIDATORLISTCOLLECTION:
452  success = detail::invoke<protocol::TMValidatorListCollection>(
453  *header, buffers, handler);
454  break;
455  case protocol::mtGET_OBJECTS:
456  success = detail::invoke<protocol::TMGetObjectByHash>(
457  *header, buffers, handler);
458  break;
459  case protocol::mtHAVE_TRANSACTIONS:
460  success = detail::invoke<protocol::TMHaveTransactions>(
461  *header, buffers, handler);
462  break;
463  case protocol::mtTRANSACTIONS:
464  success = detail::invoke<protocol::TMTransactions>(
465  *header, buffers, handler);
466  break;
467  case protocol::mtSQUELCH:
468  success =
469  detail::invoke<protocol::TMSquelch>(*header, buffers, handler);
470  break;
471  case protocol::mtPROOF_PATH_REQ:
472  success = detail::invoke<protocol::TMProofPathRequest>(
473  *header, buffers, handler);
474  break;
475  case protocol::mtPROOF_PATH_RESPONSE:
476  success = detail::invoke<protocol::TMProofPathResponse>(
477  *header, buffers, handler);
478  break;
479  case protocol::mtREPLAY_DELTA_REQ:
480  success = detail::invoke<protocol::TMReplayDeltaRequest>(
481  *header, buffers, handler);
482  break;
483  case protocol::mtREPLAY_DELTA_RESPONSE:
484  success = detail::invoke<protocol::TMReplayDeltaResponse>(
485  *header, buffers, handler);
486  break;
487  case protocol::mtGET_PEER_SHARD_INFO_V2:
488  success = detail::invoke<protocol::TMGetPeerShardInfoV2>(
489  *header, buffers, handler);
490  break;
491  case protocol::mtPEER_SHARD_INFO_V2:
492  success = detail::invoke<protocol::TMPeerShardInfoV2>(
493  *header, buffers, handler);
494  break;
495  default:
496  handler.onMessageUnknown(header->message_type);
497  success = true;
498  break;
499  }
500 
501  result.first = header->total_wire_size;
502 
503  if (!success)
504  result.second = make_error_code(boost::system::errc::bad_message);
505 
506  return result;
507 }
508 
509 } // namespace ripple
510 
511 #endif
std::vector::resize
T resize(T... args)
ripple::detail::MessageHeader::message_type
std::uint16_t message_type
The type of the message.
Definition: ProtocolMessage.h:141
ripple::detail::parseMessageContent
std::shared_ptr< T > parseMessageContent(MessageHeader const &header, Buffers const &buffers)
Definition: ProtocolMessage.h:272
std::string
STL class.
std::shared_ptr
STL class.
ripple::maximiumMessageSize
constexpr std::size_t maximiumMessageSize
Definition: overlay/Message.h:38
ripple::detail::MessageHeader::payload_wire_size
std::uint32_t payload_wire_size
The size of the payload on the wire.
Definition: ProtocolMessage.h:135
ripple::detail::invoke
bool invoke(MessageHeader const &header, Buffers const &buffers, Handler &handler)
Definition: ProtocolMessage.h:307
std::pair
ripple::detail::MessageHeader::header_size
std::uint32_t header_size
The size of the header associated with this message.
Definition: ProtocolMessage.h:132
vector
ripple::compression::headerBytes
constexpr std::size_t headerBytes
Definition: Compression.h:31
ripple::detail::buffersEnd
auto buffersEnd(BufferSequence const &bufs)
Definition: ProtocolMessage.h:160
ripple::compression::Algorithm::LZ4
@ LZ4
ripple::ZeroCopyInputStream
Implements ZeroCopyInputStream around a buffer sequence.
Definition: ZeroCopyStream.h:35
ripple::detail::buffersBegin
auto buffersBegin(BufferSequence const &bufs)
Definition: ProtocolMessage.h:152
ripple::detail::MessageHeader::algorithm
compression::Algorithm algorithm
Indicates which compression algorithm the payload is compressed with.
Definition: ProtocolMessage.h:147
ripple::protocolMessageName
std::string protocolMessageName(int type)
Returns the name of a protocol message given its type.
Definition: ProtocolMessage.h:61
ripple::detail::MessageHeader::uncompressed_size
std::uint32_t uncompressed_size
Uncompressed message size if the message is compressed.
Definition: ProtocolMessage.h:138
std::enable_if_t
ripple::detail::MessageHeader::total_wire_size
std::uint32_t total_wire_size
The size of the message on the wire.
Definition: ProtocolMessage.h:129
ripple::detail::MessageHeader
Definition: ProtocolMessage.h:123
cstdint
std::uint32_t
ripple::compression
Definition: Compression.h:29
ripple::compression::Algorithm
Algorithm
Definition: Compression.h:36
memory
ripple::protocolMessageType
protocol::MessageType protocolMessageType(protocol::TMGetLedger const &)
Definition: ProtocolMessage.h:41
ripple::compression::decompress
std::size_t decompress(InputStream &in, std::size_t inSize, std::uint8_t *decompressed, std::size_t decompressedSize, Algorithm algorithm=Algorithm::LZ4)
Decompress input stream.
Definition: Compression.h:50
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
cassert
optional
std::size_t
ripple::compression::headerBytesCompressed
constexpr std::size_t headerBytesCompressed
Definition: Compression.h:32
ripple::invokeProtocolMessage
std::pair< std::size_t, boost::system::error_code > invokeProtocolMessage(Buffers const &buffers, Handler &handler, std::size_t &hint)
Calls the handler for up to one protocol message in the passed buffers.
Definition: ProtocolMessage.h:342
std::vector::data
T data(T... args)
type_traits
ripple::detail::parseMessageHeader
std::optional< MessageHeader > parseMessageHeader(boost::system::error_code &ec, BufferSequence const &bufs, std::size_t size)
Parse a message header.
Definition: ProtocolMessage.h:177
std::is_base_of
ripple::compression::Algorithm::None
@ None