diff --git a/Builds/VisualStudio2013/RippleD.vcxproj b/Builds/VisualStudio2013/RippleD.vcxproj index ba0f8f79f..449cc00e1 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj +++ b/Builds/VisualStudio2013/RippleD.vcxproj @@ -2516,6 +2516,8 @@ + + diff --git a/Builds/VisualStudio2013/RippleD.vcxproj.filters b/Builds/VisualStudio2013/RippleD.vcxproj.filters index 6d409e3bb..1efd106d5 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2013/RippleD.vcxproj.filters @@ -3540,6 +3540,9 @@ ripple\overlay\impl + + ripple\overlay\impl + ripple\overlay diff --git a/src/ripple/app/peers/UniqueNodeList.h b/src/ripple/app/peers/UniqueNodeList.h index 545ad9150..d887c54f4 100644 --- a/src/ripple/app/peers/UniqueNodeList.h +++ b/src/ripple/app/peers/UniqueNodeList.h @@ -20,6 +20,7 @@ #ifndef RIPPLE_UNIQUENODELIST_H_INCLUDED #define RIPPLE_UNIQUENODELIST_H_INCLUDED +#include #include // #include #include diff --git a/src/ripple/overlay/impl/MessageStream.h b/src/ripple/overlay/impl/MessageStream.h deleted file mode 100644 index 008f05d5c..000000000 --- a/src/ripple/overlay/impl/MessageStream.h +++ /dev/null @@ -1,43 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2012, 2013 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#ifndef RIPPLE_OVERLAY_MESSAGESTREAM_H_INCLUDED -#define RIPPLE_OVERLAY_MESSAGESTREAM_H_INCLUDED - -#include - -namespace ripple { - -/** Turns blocks of incoming data into protocol messages. */ -class MessageStream -{ -private: - std::size_t m_bytes; - std::vector m_buffer; - -public: - void - write (void const* buffer, std::size_t bytes) - { - } -}; - -} // ripple - -#endif diff --git a/src/ripple/overlay/impl/ProtocolMessage.h b/src/ripple/overlay/impl/ProtocolMessage.h index e627434e9..a10371ff6 100644 --- a/src/ripple/overlay/impl/ProtocolMessage.h +++ b/src/ripple/overlay/impl/ProtocolMessage.h @@ -22,7 +22,7 @@ #include "ripple.pb.h" #include -#include +#include #include #include #include @@ -34,103 +34,34 @@ namespace ripple { -/** Implements ZeroCopyInputStream around a buffer sequence. - @tparam Buffers A type meeting the requirements of ConstBufferSequence. -*/ -template -class ZeroCopyInputStream - : public ::google::protobuf::io::ZeroCopyInputStream +/** Returns the name of a protocol message given its type. */ +template +std::string +protocolMessageName (int type) { -private: - using iterator = typename Buffers::const_iterator; - using const_buffer = boost::asio::const_buffer; - - std::int64_t count_ = 0; - iterator last_; - iterator first_; // Where pos_ comes from - const_buffer pos_; // What Next() will return - -public: - ZeroCopyInputStream (Buffers const& buffers); - - bool - Next (const void** data, int* size) override; - - void - BackUp (int count) override; - - bool - Skip (int count) override; - - std::int64_t - ByteCount() const override + switch (type) { - return count_; - } -}; - -//------------------------------------------------------------------------------ - -template -ZeroCopyInputStream::ZeroCopyInputStream (Buffers const& buffers) - : last_ (buffers.end()) - , first_ (buffers.begin()) - , pos_ ((first_ != last_) ? - *first_ : const_buffer(nullptr, 0)) -{ + case protocol::mtHELLO: return "hello"; + case protocol::mtPING: return "ping"; + case protocol::mtPROOFOFWORK: return "proof_of_work"; + case protocol::mtCLUSTER: return "cluster"; + case protocol::mtGET_PEERS: return "get_peers"; + case protocol::mtPEERS: return "peers"; + case protocol::mtENDPOINTS: return "endpoints"; + case protocol::mtTRANSACTION: return "tx"; + case protocol::mtGET_LEDGER: return "get_ledger"; + case protocol::mtLEDGER_DATA: return "ledger_data"; + case protocol::mtPROPOSE_LEDGER: return "propose"; + case protocol::mtSTATUS_CHANGE: return "status"; + case protocol::mtHAVE_SET: return "have_set"; + case protocol::mtVALIDATION: return "validation"; + case protocol::mtGET_OBJECTS: return "get_objects"; + default: + break; + }; + return "unknown"; } -template -bool -ZeroCopyInputStream::Next (const void** data, int* size) -{ - *data = boost::asio::buffer_cast(pos_); - *size = boost::asio::buffer_size(pos_); - if (first_ == last_) - return false; - count_ += *size; - pos_ = (++first_ != last_) ? *first_ : - const_buffer(nullptr, 0); - return true; -} - -template -void -ZeroCopyInputStream::BackUp (int count) -{ - --first_; - pos_ = *first_ + - (boost::asio::buffer_size(*first_) - count); - count_ -= count; -} - -template -bool -ZeroCopyInputStream::Skip (int count) -{ - if (first_ == last_) - return false; - while (count > 0) - { - auto const size = - boost::asio::buffer_size(pos_); - if (count < size) - { - pos_ = pos_ + count; - count_ += count; - return true; - } - count_ += size; - if (++first_ == last_) - return false; - count -= size; - pos_ = *first_; - } - return true; -} - -//------------------------------------------------------------------------------ - namespace detail { template @@ -205,36 +136,6 @@ invokeProtocolMessage (Buffers const& buffers, Handler& handler) return result; } -//------------------------------------------------------------------------------ - -/** Returns the name of a protocol message given its type. */ -inline -std::string -protocolMessageName (int type) -{ - switch (type) - { - case protocol::mtHELLO: return "hello"; - case protocol::mtPING: return "ping"; - case protocol::mtPROOFOFWORK: return "proof_of_work"; - case protocol::mtCLUSTER: return "cluster"; - case protocol::mtGET_PEERS: return "get_peers"; - case protocol::mtPEERS: return "peers"; - case protocol::mtENDPOINTS: return "endpoints"; - case protocol::mtTRANSACTION: return "tx"; - case protocol::mtGET_LEDGER: return "get_ledger"; - case protocol::mtLEDGER_DATA: return "ledger_data"; - case protocol::mtPROPOSE_LEDGER: return "propose"; - case protocol::mtSTATUS_CHANGE: return "status"; - case protocol::mtHAVE_SET: return "have_set"; - case protocol::mtVALIDATION: return "validation"; - case protocol::mtGET_OBJECTS: return "get_objects"; - default: - break; - }; - return "unknown"; -} - } // ripple #endif diff --git a/src/ripple/overlay/impl/ZeroCopyStream.h b/src/ripple/overlay/impl/ZeroCopyStream.h new file mode 100644 index 000000000..44e2460e7 --- /dev/null +++ b/src/ripple/overlay/impl/ZeroCopyStream.h @@ -0,0 +1,217 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_OVERLAY_ZEROCOPYSTREAM_H_INCLUDED +#define RIPPLE_OVERLAY_ZEROCOPYSTREAM_H_INCLUDED + +#include +#include +#include + +namespace ripple { + +/** Implements ZeroCopyInputStream around a buffer sequence. + @tparam Buffers A type meeting the requirements of ConstBufferSequence. + @see https://developers.google.com/protocol-buffers/docs/reference/cpp/google.protobuf.io.zero_copy_stream +*/ +template +class ZeroCopyInputStream + : public ::google::protobuf::io::ZeroCopyInputStream +{ +private: + using iterator = typename Buffers::const_iterator; + using const_buffer = boost::asio::const_buffer; + + std::int64_t count_ = 0; + iterator last_; + iterator first_; // Where pos_ comes from + const_buffer pos_; // What Next() will return + +public: + explicit + ZeroCopyInputStream (Buffers const& buffers); + + bool + Next (const void** data, int* size) override; + + void + BackUp (int count) override; + + bool + Skip (int count) override; + + std::int64_t + ByteCount() const override + { + return count_; + } +}; + +//------------------------------------------------------------------------------ + +template +ZeroCopyInputStream::ZeroCopyInputStream ( + Buffers const& buffers) + : last_ (buffers.end()) + , first_ (buffers.begin()) + , pos_ ((first_ != last_) ? + *first_ : const_buffer(nullptr, 0)) +{ +} + +template +bool +ZeroCopyInputStream::Next ( + const void** data, int* size) +{ + *data = boost::asio::buffer_cast(pos_); + *size = boost::asio::buffer_size(pos_); + if (first_ == last_) + return false; + count_ += *size; + pos_ = (++first_ != last_) ? *first_ : + const_buffer(nullptr, 0); + return true; +} + +template +void +ZeroCopyInputStream::BackUp (int count) +{ + --first_; + pos_ = *first_ + + (boost::asio::buffer_size(*first_) - count); + count_ -= count; +} + +template +bool +ZeroCopyInputStream::Skip (int count) +{ + if (first_ == last_) + return false; + while (count > 0) + { + auto const size = + boost::asio::buffer_size(pos_); + if (count < size) + { + pos_ = pos_ + count; + count_ += count; + return true; + } + count_ += size; + if (++first_ == last_) + return false; + count -= size; + pos_ = *first_; + } + return true; +} + +//------------------------------------------------------------------------------ + +/** Implements ZeroCopyOutputStream around a Streambuf. + Streambuf matches the public interface defined by boost::asio::streambuf. + @tparam Streambuf A type meeting the requirements of Streambuf. +*/ +template +class ZeroCopyOutputStream + : public ::google::protobuf::io::ZeroCopyOutputStream +{ +private: + using buffers_type = typename Streambuf::mutable_buffers_type; + using iterator = typename buffers_type::const_iterator; + using mutable_buffer = boost::asio::mutable_buffer; + + Streambuf& streambuf_; + std::size_t blockSize_; + std::size_t count_ = 0; + std::size_t commit_ = 0; + buffers_type buffers_; + iterator pos_; + +public: + explicit + ZeroCopyOutputStream (Streambuf& streambuf, + std::size_t blockSize); + + bool + Next (void** data, int* size) override; + + void + BackUp (int count) override; + + std::int64_t + ByteCount() const override + { + return count_; + } +}; + +//------------------------------------------------------------------------------ + +template +ZeroCopyOutputStream::ZeroCopyOutputStream( + Streambuf& streambuf, std::size_t blockSize) + : streambuf_ (streambuf) + , blockSize_ (blockSize) + , buffers_ (streambuf_.prepare(blockSize_)) + , pos_ (buffers_.begin()) +{ +} + +template +bool +ZeroCopyOutputStream::Next( + void** data, int* size) +{ + if (commit_ != 0) + { + streambuf_.commit(commit_); + count_ += commit_; + } + + if (pos_ == buffers_.end()) + { + buffers_ = streambuf_.prepare (blockSize_); + pos_ = buffers_.begin(); + } + + *data = boost::asio::buffer_cast(*pos_); + *size = boost::asio::buffer_size(*pos_); + commit_ = *size; + ++pos_; + return true; +} + +template +void +ZeroCopyOutputStream::BackUp (int count) +{ + assert(count <= commit_); + auto const n = commit_ - count; + streambuf_.commit(n); + count_ += n; + commit_ = 0; +} + +} // ripple + +#endif