Add ZeroCopyOutputStream and tidy up

This commit is contained in:
Vinnie Falco
2014-12-01 14:41:13 -08:00
committed by Nik Bougalis
parent 4a49fefdd9
commit 930a0beaf1
6 changed files with 248 additions and 167 deletions

View File

@@ -2516,6 +2516,8 @@
</ClInclude>
<ClInclude Include="..\..\src\ripple\overlay\impl\Tuning.h">
</ClInclude>
<ClInclude Include="..\..\src\ripple\overlay\impl\ZeroCopyStream.h">
</ClInclude>
<ClInclude Include="..\..\src\ripple\overlay\make_Overlay.h">
</ClInclude>
<ClInclude Include="..\..\src\ripple\overlay\Message.h">

View File

@@ -3540,6 +3540,9 @@
<ClInclude Include="..\..\src\ripple\overlay\impl\Tuning.h">
<Filter>ripple\overlay\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\overlay\impl\ZeroCopyStream.h">
<Filter>ripple\overlay\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\overlay\make_Overlay.h">
<Filter>ripple\overlay</Filter>
</ClInclude>

View File

@@ -20,6 +20,7 @@
#ifndef RIPPLE_UNIQUENODELIST_H_INCLUDED
#define RIPPLE_UNIQUENODELIST_H_INCLUDED
#include <ripple/app/peers/ClusterNodeStatus.h>
#include <beast/cxx14/memory.h> // <memory>
#include <beast/threads/Stoppable.h>
#include <boost/filesystem.hpp>

View File

@@ -1,43 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_OVERLAY_MESSAGESTREAM_H_INCLUDED
#define RIPPLE_OVERLAY_MESSAGESTREAM_H_INCLUDED
#include <cstddef>
namespace ripple {
/** Turns blocks of incoming data into protocol messages. */
class MessageStream
{
private:
std::size_t m_bytes;
std::vector <uint8_t> m_buffer;
public:
void
write (void const* buffer, std::size_t bytes)
{
}
};
} // ripple
#endif

View File

@@ -22,7 +22,7 @@
#include "ripple.pb.h"
#include <ripple/overlay/Message.h>
#include <google/protobuf/io/zero_copy_stream.h>
#include <ripple/overlay/impl/ZeroCopyStream.h>
#include <boost/asio/buffer.hpp>
#include <boost/asio/buffers_iterator.hpp>
#include <boost/system/error_code.hpp>
@@ -34,103 +34,34 @@
namespace ripple {
/** Implements ZeroCopyInputStream around a buffer sequence.
@tparam Buffers A type meeting the requirements of ConstBufferSequence.
*/
template <class Buffers>
class ZeroCopyInputStream
: public ::google::protobuf::io::ZeroCopyInputStream
/** Returns the name of a protocol message given its type. */
template <class = void>
std::string
protocolMessageName (int type)
{
private:
using iterator = typename Buffers::const_iterator;
using const_buffer = boost::asio::const_buffer;
std::int64_t count_ = 0;
iterator last_;
iterator first_; // Where pos_ comes from
const_buffer pos_; // What Next() will return
public:
ZeroCopyInputStream (Buffers const& buffers);
bool
Next (const void** data, int* size) override;
void
BackUp (int count) override;
bool
Skip (int count) override;
std::int64_t
ByteCount() const override
switch (type)
{
return count_;
}
case protocol::mtHELLO: return "hello";
case protocol::mtPING: return "ping";
case protocol::mtPROOFOFWORK: return "proof_of_work";
case protocol::mtCLUSTER: return "cluster";
case protocol::mtGET_PEERS: return "get_peers";
case protocol::mtPEERS: return "peers";
case protocol::mtENDPOINTS: return "endpoints";
case protocol::mtTRANSACTION: return "tx";
case protocol::mtGET_LEDGER: return "get_ledger";
case protocol::mtLEDGER_DATA: return "ledger_data";
case protocol::mtPROPOSE_LEDGER: return "propose";
case protocol::mtSTATUS_CHANGE: return "status";
case protocol::mtHAVE_SET: return "have_set";
case protocol::mtVALIDATION: return "validation";
case protocol::mtGET_OBJECTS: return "get_objects";
default:
break;
};
//------------------------------------------------------------------------------
template <class Buffers>
ZeroCopyInputStream<Buffers>::ZeroCopyInputStream (Buffers const& buffers)
: last_ (buffers.end())
, first_ (buffers.begin())
, pos_ ((first_ != last_) ?
*first_ : const_buffer(nullptr, 0))
{
return "unknown";
}
template <class Buffers>
bool
ZeroCopyInputStream<Buffers>::Next (const void** data, int* size)
{
*data = boost::asio::buffer_cast<void const*>(pos_);
*size = boost::asio::buffer_size(pos_);
if (first_ == last_)
return false;
count_ += *size;
pos_ = (++first_ != last_) ? *first_ :
const_buffer(nullptr, 0);
return true;
}
template <class Buffers>
void
ZeroCopyInputStream<Buffers>::BackUp (int count)
{
--first_;
pos_ = *first_ +
(boost::asio::buffer_size(*first_) - count);
count_ -= count;
}
template <class Buffers>
bool
ZeroCopyInputStream<Buffers>::Skip (int count)
{
if (first_ == last_)
return false;
while (count > 0)
{
auto const size =
boost::asio::buffer_size(pos_);
if (count < size)
{
pos_ = pos_ + count;
count_ += count;
return true;
}
count_ += size;
if (++first_ == last_)
return false;
count -= size;
pos_ = *first_;
}
return true;
}
//------------------------------------------------------------------------------
namespace detail {
template <class T, class Buffers, class Handler>
@@ -205,36 +136,6 @@ invokeProtocolMessage (Buffers const& buffers, Handler& handler)
return result;
}
//------------------------------------------------------------------------------
/** Returns the name of a protocol message given its type. */
inline
std::string
protocolMessageName (int type)
{
switch (type)
{
case protocol::mtHELLO: return "hello";
case protocol::mtPING: return "ping";
case protocol::mtPROOFOFWORK: return "proof_of_work";
case protocol::mtCLUSTER: return "cluster";
case protocol::mtGET_PEERS: return "get_peers";
case protocol::mtPEERS: return "peers";
case protocol::mtENDPOINTS: return "endpoints";
case protocol::mtTRANSACTION: return "tx";
case protocol::mtGET_LEDGER: return "get_ledger";
case protocol::mtLEDGER_DATA: return "ledger_data";
case protocol::mtPROPOSE_LEDGER: return "propose";
case protocol::mtSTATUS_CHANGE: return "status";
case protocol::mtHAVE_SET: return "have_set";
case protocol::mtVALIDATION: return "validation";
case protocol::mtGET_OBJECTS: return "get_objects";
default:
break;
};
return "unknown";
}
} // ripple
#endif

View File

@@ -0,0 +1,217 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_OVERLAY_ZEROCOPYSTREAM_H_INCLUDED
#define RIPPLE_OVERLAY_ZEROCOPYSTREAM_H_INCLUDED
#include <google/protobuf/io/zero_copy_stream.h>
#include <boost/asio/buffer.hpp>
#include <cstdint>
namespace ripple {
/** Implements ZeroCopyInputStream around a buffer sequence.
@tparam Buffers A type meeting the requirements of ConstBufferSequence.
@see https://developers.google.com/protocol-buffers/docs/reference/cpp/google.protobuf.io.zero_copy_stream
*/
template <class Buffers>
class ZeroCopyInputStream
: public ::google::protobuf::io::ZeroCopyInputStream
{
private:
using iterator = typename Buffers::const_iterator;
using const_buffer = boost::asio::const_buffer;
std::int64_t count_ = 0;
iterator last_;
iterator first_; // Where pos_ comes from
const_buffer pos_; // What Next() will return
public:
explicit
ZeroCopyInputStream (Buffers const& buffers);
bool
Next (const void** data, int* size) override;
void
BackUp (int count) override;
bool
Skip (int count) override;
std::int64_t
ByteCount() const override
{
return count_;
}
};
//------------------------------------------------------------------------------
template <class Buffers>
ZeroCopyInputStream<Buffers>::ZeroCopyInputStream (
Buffers const& buffers)
: last_ (buffers.end())
, first_ (buffers.begin())
, pos_ ((first_ != last_) ?
*first_ : const_buffer(nullptr, 0))
{
}
template <class Buffers>
bool
ZeroCopyInputStream<Buffers>::Next (
const void** data, int* size)
{
*data = boost::asio::buffer_cast<void const*>(pos_);
*size = boost::asio::buffer_size(pos_);
if (first_ == last_)
return false;
count_ += *size;
pos_ = (++first_ != last_) ? *first_ :
const_buffer(nullptr, 0);
return true;
}
template <class Buffers>
void
ZeroCopyInputStream<Buffers>::BackUp (int count)
{
--first_;
pos_ = *first_ +
(boost::asio::buffer_size(*first_) - count);
count_ -= count;
}
template <class Buffers>
bool
ZeroCopyInputStream<Buffers>::Skip (int count)
{
if (first_ == last_)
return false;
while (count > 0)
{
auto const size =
boost::asio::buffer_size(pos_);
if (count < size)
{
pos_ = pos_ + count;
count_ += count;
return true;
}
count_ += size;
if (++first_ == last_)
return false;
count -= size;
pos_ = *first_;
}
return true;
}
//------------------------------------------------------------------------------
/** Implements ZeroCopyOutputStream around a Streambuf.
Streambuf matches the public interface defined by boost::asio::streambuf.
@tparam Streambuf A type meeting the requirements of Streambuf.
*/
template <class Streambuf>
class ZeroCopyOutputStream
: public ::google::protobuf::io::ZeroCopyOutputStream
{
private:
using buffers_type = typename Streambuf::mutable_buffers_type;
using iterator = typename buffers_type::const_iterator;
using mutable_buffer = boost::asio::mutable_buffer;
Streambuf& streambuf_;
std::size_t blockSize_;
std::size_t count_ = 0;
std::size_t commit_ = 0;
buffers_type buffers_;
iterator pos_;
public:
explicit
ZeroCopyOutputStream (Streambuf& streambuf,
std::size_t blockSize);
bool
Next (void** data, int* size) override;
void
BackUp (int count) override;
std::int64_t
ByteCount() const override
{
return count_;
}
};
//------------------------------------------------------------------------------
template <class Streambuf>
ZeroCopyOutputStream<Streambuf>::ZeroCopyOutputStream(
Streambuf& streambuf, std::size_t blockSize)
: streambuf_ (streambuf)
, blockSize_ (blockSize)
, buffers_ (streambuf_.prepare(blockSize_))
, pos_ (buffers_.begin())
{
}
template <class Streambuf>
bool
ZeroCopyOutputStream<Streambuf>::Next(
void** data, int* size)
{
if (commit_ != 0)
{
streambuf_.commit(commit_);
count_ += commit_;
}
if (pos_ == buffers_.end())
{
buffers_ = streambuf_.prepare (blockSize_);
pos_ = buffers_.begin();
}
*data = boost::asio::buffer_cast<void*>(*pos_);
*size = boost::asio::buffer_size(*pos_);
commit_ = *size;
++pos_;
return true;
}
template <class Streambuf>
void
ZeroCopyOutputStream<Streambuf>::BackUp (int count)
{
assert(count <= commit_);
auto const n = commit_ - count;
streambuf_.commit(n);
count_ += n;
commit_ = 0;
}
} // ripple
#endif