rippled
ProtocolMessage.h
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #ifndef RIPPLE_OVERLAY_PROTOCOLMESSAGE_H_INCLUDED
21 #define RIPPLE_OVERLAY_PROTOCOLMESSAGE_H_INCLUDED
22 
23 #include <ripple/basics/ByteUtilities.h>
24 #include <ripple/overlay/Compression.h>
25 #include <ripple/overlay/Message.h>
26 #include <ripple/overlay/impl/ZeroCopyStream.h>
27 #include <ripple/protocol/messages.h>
28 #include <boost/asio/buffer.hpp>
29 #include <boost/asio/buffers_iterator.hpp>
30 #include <boost/system/error_code.hpp>
31 #include <cassert>
32 #include <cstdint>
33 #include <memory>
34 #include <optional>
35 #include <type_traits>
36 #include <vector>
37 
38 namespace ripple {
39 
40 inline protocol::MessageType
41 protocolMessageType(protocol::TMGetLedger const&)
42 {
43  return protocol::mtGET_LEDGER;
44 }
45 
46 inline protocol::MessageType
47 protocolMessageType(protocol::TMReplayDeltaRequest const&)
48 {
49  return protocol::mtREPLAY_DELTA_REQ;
50 }
51 
52 inline protocol::MessageType
53 protocolMessageType(protocol::TMProofPathRequest const&)
54 {
55  return protocol::mtPROOF_PATH_REQ;
56 }
57 
59 template <class = void>
62 {
63  switch (type)
64  {
65  case protocol::mtMANIFESTS:
66  return "manifests";
67  case protocol::mtPING:
68  return "ping";
69  case protocol::mtCLUSTER:
70  return "cluster";
71  case protocol::mtENDPOINTS:
72  return "endpoints";
73  case protocol::mtTRANSACTION:
74  return "tx";
75  case protocol::mtGET_LEDGER:
76  return "get_ledger";
77  case protocol::mtLEDGER_DATA:
78  return "ledger_data";
79  case protocol::mtPROPOSE_LEDGER:
80  return "propose";
81  case protocol::mtSTATUS_CHANGE:
82  return "status";
83  case protocol::mtHAVE_SET:
84  return "have_set";
85  case protocol::mtVALIDATORLIST:
86  return "validator_list";
87  case protocol::mtVALIDATORLISTCOLLECTION:
88  return "validator_list_collection";
89  case protocol::mtVALIDATION:
90  return "validation";
91  case protocol::mtGET_PEER_SHARD_INFO:
92  return "get_peer_shard_info";
93  case protocol::mtPEER_SHARD_INFO:
94  return "peer_shard_info";
95  case protocol::mtGET_OBJECTS:
96  return "get_objects";
97  case protocol::mtHAVE_TRANSACTIONS:
98  return "have_transactions";
99  case protocol::mtTRANSACTIONS:
100  return "transactions";
101  case protocol::mtSQUELCH:
102  return "squelch";
103  case protocol::mtPROOF_PATH_REQ:
104  return "proof_path_request";
105  case protocol::mtPROOF_PATH_RESPONSE:
106  return "proof_path_response";
107  case protocol::mtREPLAY_DELTA_REQ:
108  return "replay_delta_request";
109  case protocol::mtREPLAY_DELTA_RESPONSE:
110  return "replay_delta_response";
111  case protocol::mtGET_PEER_SHARD_INFO_V2:
112  return "get_peer_shard_info_v2";
113  case protocol::mtPEER_SHARD_INFO_V2:
114  return "peer_shard_info_v2";
115  case protocol::mtSTART_PROTOCOL:
116  return "start_protocol";
117  case protocol::mtGRACEFUL_CLOSE:
118  return "graceful_close";
119  default:
120  break;
121  }
122  return "unknown";
123 }
124 
125 namespace detail {
126 
128 {
134 
137 
140 
143 
146 
152 };
153 
154 template <typename BufferSequence>
155 auto
156 buffersBegin(BufferSequence const& bufs)
157 {
158  return boost::asio::buffers_iterator<BufferSequence, std::uint8_t>::begin(
159  bufs);
160 }
161 
162 template <typename BufferSequence>
163 auto
164 buffersEnd(BufferSequence const& bufs)
165 {
166  return boost::asio::buffers_iterator<BufferSequence, std::uint8_t>::end(
167  bufs);
168 }
169 
179 template <class BufferSequence>
182  boost::system::error_code& ec,
183  BufferSequence const& bufs,
184  std::size_t size)
185 {
186  using namespace ripple::compression;
187 
188  MessageHeader hdr;
189  auto iter = buffersBegin(bufs);
190  assert(iter != buffersEnd(bufs));
191 
192  // Check valid header compressed message:
193  // - 4 bits are the compression algorithm, 1st bit is always set to 1
194  // - 2 bits are always set to 0
195  // - 26 bits are the payload size
196  // - 32 bits are the uncompressed data size
197  if (*iter & 0x80)
198  {
200 
201  // not enough bytes to parse the header
202  if (size < hdr.header_size)
203  {
204  ec = make_error_code(boost::system::errc::success);
205  return std::nullopt;
206  }
207 
208  if (*iter & 0x0C)
209  {
210  ec = make_error_code(boost::system::errc::protocol_error);
211  return std::nullopt;
212  }
213 
214  hdr.algorithm = static_cast<compression::Algorithm>(*iter & 0xF0);
215 
217  {
218  ec = make_error_code(boost::system::errc::protocol_error);
219  return std::nullopt;
220  }
221 
222  for (int i = 0; i != 4; ++i)
223  hdr.payload_wire_size = (hdr.payload_wire_size << 8) + *iter++;
224 
225  // clear the top four bits (the compression bits).
226  hdr.payload_wire_size &= 0x0FFFFFFF;
227 
229 
230  for (int i = 0; i != 2; ++i)
231  hdr.message_type = (hdr.message_type << 8) + *iter++;
232 
233  for (int i = 0; i != 4; ++i)
234  hdr.uncompressed_size = (hdr.uncompressed_size << 8) + *iter++;
235 
236  return hdr;
237  }
238 
239  // Check valid header uncompressed message:
240  // - 6 bits are set to 0
241  // - 26 bits are the payload size
242  if ((*iter & 0xFC) == 0)
243  {
244  hdr.header_size = headerBytes;
245 
246  if (size < hdr.header_size)
247  {
248  ec = make_error_code(boost::system::errc::success);
249  return std::nullopt;
250  }
251 
252  hdr.algorithm = Algorithm::None;
253 
254  for (int i = 0; i != 4; ++i)
255  hdr.payload_wire_size = (hdr.payload_wire_size << 8) + *iter++;
256 
259 
260  for (int i = 0; i != 2; ++i)
261  hdr.message_type = (hdr.message_type << 8) + *iter++;
262 
263  return hdr;
264  }
265 
266  ec = make_error_code(boost::system::errc::no_message);
267  return std::nullopt;
268 }
269 
270 template <
271  class T,
272  class Buffers,
273  class = std::enable_if_t<
276 parseMessageContent(MessageHeader const& header, Buffers const& buffers)
277 {
278  auto const m = std::make_shared<T>();
279 
280  ZeroCopyInputStream<Buffers> stream(buffers);
281  stream.Skip(header.header_size);
282 
284  {
286  payload.resize(header.uncompressed_size);
287 
288  auto const payloadSize = ripple::compression::decompress(
289  stream,
290  header.payload_wire_size,
291  payload.data(),
292  header.uncompressed_size,
293  header.algorithm);
294 
295  if (payloadSize == 0 || !m->ParseFromArray(payload.data(), payloadSize))
296  return {};
297  }
298  else if (!m->ParseFromZeroCopyStream(&stream))
299  return {};
300 
301  return m;
302 }
303 
304 template <
305  class T,
306  class Buffers,
307  class Handler,
308  class = std::enable_if_t<
310 bool
311 invoke(MessageHeader const& header, Buffers const& buffers, Handler& handler)
312 {
313  auto const m = parseMessageContent<T>(header, buffers);
314  if (!m)
315  return false;
316 
317  using namespace ripple::compression;
318  handler.onMessageBegin(
319  header.message_type,
320  m,
321  header.payload_wire_size,
322  header.uncompressed_size,
323  header.algorithm != Algorithm::None);
324  handler.onMessage(m);
325  handler.onMessageEnd(header.message_type, m);
326 
327  return true;
328 }
329 
330 } // namespace detail
331 
344 template <class Buffers, class Handler>
347  Buffers const& buffers,
348  Handler& handler,
349  std::size_t& hint)
350 {
352 
353  auto const size = boost::asio::buffer_size(buffers);
354 
355  if (size == 0)
356  return result;
357 
358  auto header = detail::parseMessageHeader(result.second, buffers, size);
359 
360  // If we can't parse the header then it may be that we don't have enough
361  // bytes yet, or because the message was cut off (if error_code is success).
362  // Otherwise we failed to match the header's marker (error_code is set to
363  // no_message) or the compression algorithm is invalid (error_code is
364  // protocol_error) and signal an error.
365  if (!header)
366  return result;
367 
368  // We implement a maximum size for protocol messages. Sending a message
369  // whose size exceeds this may result in the connection being dropped. A
370  // larger message size may be supported in the future or negotiated as
371  // part of a protocol upgrade.
372  if (header->payload_wire_size > maximiumMessageSize ||
373  header->uncompressed_size > maximiumMessageSize)
374  {
375  result.second = make_error_code(boost::system::errc::message_size);
376  return result;
377  }
378 
379  // We requested uncompressed messages from the peer but received compressed.
380  if (!handler.compressionEnabled() &&
381  header->algorithm != compression::Algorithm::None)
382  {
383  result.second = make_error_code(boost::system::errc::protocol_error);
384  return result;
385  }
386 
387  // We don't have the whole message yet. This isn't an error but we have
388  // nothing to do.
389  if (header->total_wire_size > size)
390  {
391  hint = header->total_wire_size - size;
392  return result;
393  }
394 
395  bool success;
396 
397  switch (header->message_type)
398  {
399  case protocol::mtMANIFESTS:
400  success = detail::invoke<protocol::TMManifests>(
401  *header, buffers, handler);
402  break;
403  case protocol::mtPING:
404  success =
405  detail::invoke<protocol::TMPing>(*header, buffers, handler);
406  break;
407  case protocol::mtCLUSTER:
408  success =
409  detail::invoke<protocol::TMCluster>(*header, buffers, handler);
410  break;
411  case protocol::mtENDPOINTS:
412  success = detail::invoke<protocol::TMEndpoints>(
413  *header, buffers, handler);
414  break;
415  case protocol::mtTRANSACTION:
416  success = detail::invoke<protocol::TMTransaction>(
417  *header, buffers, handler);
418  break;
419  case protocol::mtGET_LEDGER:
420  success = detail::invoke<protocol::TMGetLedger>(
421  *header, buffers, handler);
422  break;
423  case protocol::mtLEDGER_DATA:
424  success = detail::invoke<protocol::TMLedgerData>(
425  *header, buffers, handler);
426  break;
427  case protocol::mtPROPOSE_LEDGER:
428  success = detail::invoke<protocol::TMProposeSet>(
429  *header, buffers, handler);
430  break;
431  case protocol::mtSTATUS_CHANGE:
432  success = detail::invoke<protocol::TMStatusChange>(
433  *header, buffers, handler);
434  break;
435  case protocol::mtHAVE_SET:
436  success = detail::invoke<protocol::TMHaveTransactionSet>(
437  *header, buffers, handler);
438  break;
439  case protocol::mtVALIDATION:
440  success = detail::invoke<protocol::TMValidation>(
441  *header, buffers, handler);
442  break;
443  case protocol::mtGET_PEER_SHARD_INFO:
444  success = detail::invoke<protocol::TMGetPeerShardInfo>(
445  *header, buffers, handler);
446  break;
447  case protocol::mtPEER_SHARD_INFO:
448  success = detail::invoke<protocol::TMPeerShardInfo>(
449  *header, buffers, handler);
450  break;
451  case protocol::mtVALIDATORLIST:
452  success = detail::invoke<protocol::TMValidatorList>(
453  *header, buffers, handler);
454  break;
455  case protocol::mtVALIDATORLISTCOLLECTION:
456  success = detail::invoke<protocol::TMValidatorListCollection>(
457  *header, buffers, handler);
458  break;
459  case protocol::mtGET_OBJECTS:
460  success = detail::invoke<protocol::TMGetObjectByHash>(
461  *header, buffers, handler);
462  break;
463  case protocol::mtHAVE_TRANSACTIONS:
464  success = detail::invoke<protocol::TMHaveTransactions>(
465  *header, buffers, handler);
466  break;
467  case protocol::mtTRANSACTIONS:
468  success = detail::invoke<protocol::TMTransactions>(
469  *header, buffers, handler);
470  break;
471  case protocol::mtSQUELCH:
472  success =
473  detail::invoke<protocol::TMSquelch>(*header, buffers, handler);
474  break;
475  case protocol::mtPROOF_PATH_REQ:
476  success = detail::invoke<protocol::TMProofPathRequest>(
477  *header, buffers, handler);
478  break;
479  case protocol::mtPROOF_PATH_RESPONSE:
480  success = detail::invoke<protocol::TMProofPathResponse>(
481  *header, buffers, handler);
482  break;
483  case protocol::mtREPLAY_DELTA_REQ:
484  success = detail::invoke<protocol::TMReplayDeltaRequest>(
485  *header, buffers, handler);
486  break;
487  case protocol::mtREPLAY_DELTA_RESPONSE:
488  success = detail::invoke<protocol::TMReplayDeltaResponse>(
489  *header, buffers, handler);
490  break;
491  case protocol::mtGET_PEER_SHARD_INFO_V2:
492  success = detail::invoke<protocol::TMGetPeerShardInfoV2>(
493  *header, buffers, handler);
494  break;
495  case protocol::mtPEER_SHARD_INFO_V2:
496  success = detail::invoke<protocol::TMPeerShardInfoV2>(
497  *header, buffers, handler);
498  break;
499  case protocol::mtSTART_PROTOCOL:
500  success = detail::invoke<protocol::TMStartProtocol>(
501  *header, buffers, handler);
502  break;
503  case protocol::mtGRACEFUL_CLOSE:
504  success = detail::invoke<protocol::TMGracefulClose>(
505  *header, buffers, handler);
506  break;
507  default:
508  handler.onMessageUnknown(header->message_type);
509  success = true;
510  break;
511  }
512 
513  result.first = header->total_wire_size;
514 
515  if (!success)
516  result.second = make_error_code(boost::system::errc::bad_message);
517 
518  return result;
519 }
520 
521 } // namespace ripple
522 
523 #endif
std::vector::resize
T resize(T... args)
ripple::detail::MessageHeader::message_type
std::uint16_t message_type
The type of the message.
Definition: ProtocolMessage.h:145
ripple::detail::parseMessageContent
std::shared_ptr< T > parseMessageContent(MessageHeader const &header, Buffers const &buffers)
Definition: ProtocolMessage.h:276
std::string
STL class.
std::shared_ptr
STL class.
ripple::maximiumMessageSize
constexpr std::size_t maximiumMessageSize
Definition: overlay/Message.h:38
ripple::detail::MessageHeader::payload_wire_size
std::uint32_t payload_wire_size
The size of the payload on the wire.
Definition: ProtocolMessage.h:139
ripple::detail::invoke
bool invoke(MessageHeader const &header, Buffers const &buffers, Handler &handler)
Definition: ProtocolMessage.h:311
std::pair
ripple::detail::MessageHeader::header_size
std::uint32_t header_size
The size of the header associated with this message.
Definition: ProtocolMessage.h:136
vector
ripple::compression::headerBytes
constexpr std::size_t headerBytes
Definition: Compression.h:31
ripple::detail::buffersEnd
auto buffersEnd(BufferSequence const &bufs)
Definition: ProtocolMessage.h:164
ripple::compression::Algorithm::LZ4
@ LZ4
ripple::ZeroCopyInputStream
Implements ZeroCopyInputStream around a buffer sequence.
Definition: ZeroCopyStream.h:35
ripple::detail::buffersBegin
auto buffersBegin(BufferSequence const &bufs)
Definition: ProtocolMessage.h:156
ripple::detail::MessageHeader::algorithm
compression::Algorithm algorithm
Indicates which compression algorithm the payload is compressed with.
Definition: ProtocolMessage.h:151
ripple::protocolMessageName
std::string protocolMessageName(int type)
Returns the name of a protocol message given its type.
Definition: ProtocolMessage.h:61
ripple::detail::MessageHeader::uncompressed_size
std::uint32_t uncompressed_size
Uncompressed message size if the message is compressed.
Definition: ProtocolMessage.h:142
std::enable_if_t
ripple::detail::MessageHeader::total_wire_size
std::uint32_t total_wire_size
The size of the message on the wire.
Definition: ProtocolMessage.h:133
ripple::detail::MessageHeader
Definition: ProtocolMessage.h:127
cstdint
std::uint32_t
ripple::compression
Definition: Compression.h:29
ripple::compression::Algorithm
Algorithm
Definition: Compression.h:36
memory
ripple::protocolMessageType
protocol::MessageType protocolMessageType(protocol::TMGetLedger const &)
Definition: ProtocolMessage.h:41
ripple::compression::decompress
std::size_t decompress(InputStream &in, std::size_t inSize, std::uint8_t *decompressed, std::size_t decompressedSize, Algorithm algorithm=Algorithm::LZ4)
Decompress input stream.
Definition: Compression.h:50
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
cassert
optional
std::size_t
ripple::compression::headerBytesCompressed
constexpr std::size_t headerBytesCompressed
Definition: Compression.h:32
ripple::invokeProtocolMessage
std::pair< std::size_t, boost::system::error_code > invokeProtocolMessage(Buffers const &buffers, Handler &handler, std::size_t &hint)
Calls the handler for up to one protocol message in the passed buffers.
Definition: ProtocolMessage.h:346
std::vector::data
T data(T... args)
type_traits
ripple::detail::parseMessageHeader
std::optional< MessageHeader > parseMessageHeader(boost::system::error_code &ec, BufferSequence const &bufs, std::size_t size)
Parse a message header.
Definition: ProtocolMessage.h:181
std::is_base_of
ripple::compression::Algorithm::None
@ None