NuDB: Use nodeobject codec in Backend (RIPD-793):

This adds codecs for snappy and lz4, and a new nodeobject codec. The
nodeobject codec provides a highly efficient custom compression scheme
for inner nodes, which make up the majority of nodestore databases.
Non inner node objects are compressed using lz4.

The NuDB backend is modified to use the nodeobject codec. This change
is not backward compatible - older NuDB databases cannot be opened or
imported.
This commit is contained in:
Vinnie Falco
2015-02-06 11:32:11 -08:00
parent f946d7b447
commit b7ba509618
5 changed files with 530 additions and 194 deletions

View File

@@ -2112,6 +2112,8 @@
</ClCompile>
<ClInclude Include="..\..\src\ripple\nodestore\impl\BatchWriter.h">
</ClInclude>
<ClInclude Include="..\..\src\ripple\nodestore\impl\codec.h">
</ClInclude>
<ClInclude Include="..\..\src\ripple\nodestore\impl\DatabaseImp.h">
</ClInclude>
<ClCompile Include="..\..\src\ripple\nodestore\impl\DatabaseRotatingImp.cpp">

View File

@@ -3072,6 +3072,9 @@
<ClInclude Include="..\..\src\ripple\nodestore\impl\BatchWriter.h">
<Filter>ripple\nodestore\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\nodestore\impl\codec.h">
<Filter>ripple\nodestore\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\nodestore\impl\DatabaseImp.h">
<Filter>ripple\nodestore\impl</Filter>
</ClInclude>

View File

@@ -21,14 +21,14 @@
#include <ripple/nodestore/Factory.h>
#include <ripple/nodestore/Manager.h>
#include <ripple/nodestore/impl/codec.h>
#include <ripple/nodestore/impl/DecodedBlob.h>
#include <ripple/nodestore/impl/EncodedBlob.h>
#include <beast/nudb.h>
#include <beast/nudb/detail/bucket.h> // remove asap
#include <beast/nudb/detail/varint.h>
#include <beast/nudb/identity_codec.h>
#include <beast/nudb/visit.h>
#include <beast/hash/xxhasher.h>
#include <snappy.h>
#include <boost/filesystem.hpp>
#include <cassert>
#include <chrono>
@@ -50,22 +50,11 @@ public:
// distribution of data sizes.
arena_alloc_size = 16 * 1024 * 1024,
// Version 1
// No compression
//
typeOne = 1,
// Version 2
// Snappy compression
typeTwo = 2,
currentType = typeTwo
currentType = 1
};
using api = beast::nudb::api<
beast::xxhasher, beast::nudb::identity_codec>;
beast::xxhasher, nodeobject_codec>;
beast::Journal journal_;
size_t const keyBytes_;
@@ -137,74 +126,8 @@ public:
}
}
//--------------------------------------------------------------------------
class Buffer
{
private:
std::size_t size_ = 0;
std::size_t capacity_ = 0;
std::unique_ptr <std::uint8_t[]> buf_;
public:
Buffer() = default;
Buffer (Buffer const&) = delete;
Buffer& operator= (Buffer const&) = delete;
explicit
Buffer (std::size_t n)
{
resize (n);
}
std::size_t
size() const
{
return size_;
}
std::size_t
capacity() const
{
return capacity_;
}
void*
get()
{
return buf_.get();
}
void
resize (std::size_t n)
{
if (capacity_ < n)
{
capacity_ = beast::nudb::detail::ceil_pow2(n);
buf_.reset (new std::uint8_t[capacity_]);
}
size_ = n;
}
// Meet the requirements of BufferFactory
void*
operator() (std::size_t n)
{
resize(n);
return get();
}
};
//--------------------------------------------------------------------------
//
// Version 1 Database
//
// Uncompressed
//
Status
fetch1 (void const* key,
std::shared_ptr <NodeObject>* pno)
fetch (void const* key, NodeObject::Ptr* pno)
{
Status status;
pno->reset();
@@ -226,94 +149,13 @@ public:
return status;
}
void
insert1 (void const* key, void const* data,
std::size_t size)
{
db_.insert (key, data, size);
}
//--------------------------------------------------------------------------
//
// Version 2 Database
//
// Snappy compression
//
Status
fetch2 (void const* key,
std::shared_ptr <NodeObject>* pno)
{
Status status;
pno->reset();
if (! db_.fetch (key,
[&](void const* data, std::size_t size)
{
std::size_t actual;
if (! snappy::GetUncompressedLength(
(char const*)data, size, &actual))
{
status = dataCorrupt;
return;
}
std::unique_ptr <char[]> buf (new char[actual]);
snappy::RawUncompress (
(char const*)data, size, buf.get());
DecodedBlob decoded (key, buf.get(), actual);
if (! decoded.wasOk ())
{
status = dataCorrupt;
return;
}
*pno = decoded.createObject();
status = ok;
}))
{
return notFound;
}
return status;
}
void
insert2 (void const* key, void const* data,
std::size_t size)
{
std::unique_ptr<char[]> buf (
new char[snappy::MaxCompressedLength(size)]);
std::size_t actual;
snappy::RawCompress ((char const*)data, size,
buf.get(), &actual);
db_.insert (key, buf.get(), actual);
}
//--------------------------------------------------------------------------
Status
fetch (void const* key, NodeObject::Ptr* pno)
{
switch (db_.appnum())
{
case typeOne: return fetch1 (key, pno);
case typeTwo: return fetch2 (key, pno);
}
throw std::runtime_error(
"nodestore: unknown appnum");
return notFound;
}
void
do_insert (std::shared_ptr <NodeObject> const& no)
{
EncodedBlob e;
e.prepare (no);
switch (db_.appnum())
{
case typeOne: return insert1 (e.getKey(), e.getData(), e.getSize());
case typeTwo: return insert2 (e.getKey(), e.getData(), e.getSize());
}
throw std::runtime_error(
"nodestore: unknown appnum");
db_.insert (e.getKey(),
e.getData(), e.getSize());
}
void
@@ -352,40 +194,17 @@ public:
auto const dp = db_.dat_path();
auto const kp = db_.key_path();
auto const lp = db_.log_path();
auto const appnum = db_.appnum();
//auto const appnum = db_.appnum();
db_.close();
api::visit (dp,
[&](
void const* key, std::size_t key_bytes,
void const* data, std::size_t size)
{
switch (appnum)
{
case typeOne:
{
DecodedBlob decoded (key, data, size);
if (! decoded.wasOk ())
return false;
f (decoded.createObject());
break;
}
case typeTwo:
{
std::size_t actual;
if (! snappy::GetUncompressedLength(
(char const*)data, size, &actual))
return false;
std::unique_ptr <char[]> buf (new char[actual]);
if (! snappy::RawUncompress ((char const*)data,
size, buf.get()))
return false;
DecodedBlob decoded (key, buf.get(), actual);
if (! decoded.wasOk ())
return false;
f (decoded.createObject());
break;
}
}
DecodedBlob decoded (key, data, size);
if (! decoded.wasOk ())
return false;
f (decoded.createObject());
return true;
});
db_.open (dp, kp, lp,

View File

@@ -58,10 +58,10 @@ DecodedBlob::DecodedBlob (void const* key, void const* value, int valueBytes)
switch (m_objectType)
{
case hotUNKNOWN:
default:
break;
case hotUNKNOWN:
case hotLEDGER:
case hotTRANSACTION:
case hotACCOUNT_NODE:

View File

@@ -0,0 +1,512 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_NODESTORE_CODEC_H_INCLUDED
#define RIPPLE_NODESTORE_CODEC_H_INCLUDED
#include <ripple/protocol/HashPrefix.h>
#include <beast/nudb/common.h>
#include <beast/nudb/detail/varint.h>
#include <lz4/lib/lz4.h>
#include <snappy.h>
#include <cstddef>
#include <cstring>
#include <utility>
namespace ripple {
namespace NodeStore {
namespace detail {
template <class BufferFactory>
std::pair<void const*, std::size_t>
snappy_compress (void const* in,
std::size_t in_size, BufferFactory&& bf)
{
std::pair<void const*, std::size_t> result;
auto const out_max =
snappy::MaxCompressedLength(in_size);
void* const out = bf(out_max);
result.first = out;
snappy::RawCompress(
reinterpret_cast<char const*>(in),
in_size, reinterpret_cast<char*>(out),
&result.second);
return result;
}
template <class BufferFactory>
std::pair<void const*, std::size_t>
snappy_decompress (void const* in,
std::size_t in_size, BufferFactory&& bf)
{
std::pair<void const*, std::size_t> result;
if (! snappy::GetUncompressedLength(
reinterpret_cast<char const*>(in),
in_size, &result.second))
throw beast::nudb::codec_error(
"snappy decompress");
void* const out = bf(result.second);
result.first = out;
if (! snappy::RawUncompress(
reinterpret_cast<char const*>(in), in_size,
reinterpret_cast<char*>(out)))
throw beast::nudb::codec_error(
"snappy decompress");
return result;
}
template <class BufferFactory>
std::pair<void const*, std::size_t>
lz4_decompress (void const* in,
std::size_t in_size, BufferFactory&& bf)
{
using beast::nudb::codec_error;
using namespace beast::nudb::detail;
std::pair<void const*, std::size_t> result;
std::uint8_t const* p = reinterpret_cast<
std::uint8_t const*>(in);
auto const n = read_varint(
p, in_size, result.second);
if (n == 0)
throw codec_error(
"lz4 decompress");
void* const out = bf(result.second);
result.first = out;
if (LZ4_decompress_fast(
reinterpret_cast<char const*>(in) + n,
reinterpret_cast<char*>(out),
result.second) + n != in_size)
throw codec_error(
"lz4 decompress");
return result;
}
template <class BufferFactory>
std::pair<void const*, std::size_t>
lz4_compress (void const* in,
std::size_t in_size, BufferFactory&& bf)
{
using beast::nudb::codec_error;
using namespace beast::nudb::detail;
std::pair<void const*, std::size_t> result;
std::array<std::uint8_t, varint_traits<
std::size_t>::max> vi;
auto const n = write_varint(
vi.data(), in_size);
auto const out_max =
LZ4_compressBound(in_size);
std::uint8_t* out = reinterpret_cast<
std::uint8_t*>(bf(n + out_max));
result.first = out;
std::memcpy(out, vi.data(), n);
auto const out_size = LZ4_compress(
reinterpret_cast<char const*>(in),
reinterpret_cast<char*>(out + n),
in_size);
if (out_size == 0)
throw codec_error(
"lz4 compress");
result.second = n + out_size;
return result;
}
//------------------------------------------------------------------------------
/*
object types:
0 = Uncompressed
1 = lz4 compressed
2 = inner node compressed
3 = full inner node
*/
template <class BufferFactory>
std::pair<void const*, std::size_t>
nodeobject_decompress (void const* in,
std::size_t in_size, BufferFactory&& bf)
{
using beast::nudb::codec_error;
using namespace beast::nudb::detail;
std::uint8_t const* p = reinterpret_cast<
std::uint8_t const*>(in);
std::size_t type;
auto const vn = read_varint(
p, in_size, type);
if (vn == 0)
throw codec_error(
"nodeobject decompress");
p += vn;
in_size -= vn;
std::pair<void const*, std::size_t> result;
switch(type)
{
case 0: // uncompressed
{
result.first = p;
result.second = in_size;
break;
}
case 1: // lz4
{
result = lz4_decompress(
p, in_size, bf);
break;
}
case 2: // inner node
{
auto const hs =
field<std::uint16_t>::size; // Mask
if (in_size < hs + 32)
throw codec_error(
"nodeobject codec: short inner node");
istream is(p, in_size);
std::uint16_t mask;
read<std::uint16_t>(is, mask); // Mask
in_size -= hs;
result.second = 525;
void* const out = bf(result.second);
result.first = out;
ostream os(out, result.second);
write<std::uint32_t>(os, 0);
write<std::uint32_t>(os, 0);
write<std::uint8_t> (os, hotUNKNOWN);
write<std::uint32_t>(os, HashPrefix::innerNode);
if (mask == 0)
throw codec_error(
"nodeobject codec: empty inner node");
std::uint16_t bit = 0x8000;
for (int i = 16; i--; bit >>= 1)
{
if (mask & bit)
{
if (in_size < 32)
throw codec_error(
"nodeobject codec: short inner node");
std::memcpy(os.data(32), is(32), 32);
in_size -= 32;
}
else
{
std::memset(os.data(32), 0, 32);
}
}
if (in_size > 0)
throw codec_error(
"nodeobject codec: long inner node");
break;
}
case 3: // full inner node
{
if (in_size != 16 * 32) // hashes
throw codec_error(
"nodeobject codec: short full inner node");
istream is(p, in_size);
result.second = 525;
void* const out = bf(result.second);
result.first = out;
ostream os(out, result.second);
write<std::uint32_t>(os, 0);
write<std::uint32_t>(os, 0);
write<std::uint8_t> (os, hotUNKNOWN);
write<std::uint32_t>(os, HashPrefix::innerNode);
write(os, is(512), 512);
break;
}
default:
throw codec_error(
"nodeobject codec: bad type=" +
std::to_string(type));
};
return result;
}
template <class = void>
void const*
zero32()
{
static std::array<char, 32> v =
[]
{
std::array<char, 32> v;
v.fill(0);
return v;
}();
return v.data();
}
template <class BufferFactory>
std::pair<void const*, std::size_t>
nodeobject_compress (void const* in,
std::size_t in_size, BufferFactory&& bf)
{
using beast::nudb::codec_error;
using namespace beast::nudb::detail;
std::size_t type = 1;
// Check for inner node
if (in_size == 525)
{
istream is(in, in_size);
std::uint32_t index;
std::uint32_t unused;
std::uint8_t kind;
std::uint32_t prefix;
read<std::uint32_t>(is, index);
read<std::uint32_t>(is, unused);
read<std::uint8_t> (is, kind);
read<std::uint32_t>(is, prefix);
if (prefix == HashPrefix::innerNode)
{
std::size_t n = 0;
std::uint16_t mask = 0;
std::array<
std::uint8_t, 512> vh;
for (unsigned bit = 0x8000;
bit; bit >>= 1)
{
void const* const h = is(32);
if (std::memcmp(
h, zero32(), 32) == 0)
continue;
std::memcpy(
vh.data() + 32 * n, h, 32);
mask |= bit;
++n;
}
std::pair<void const*,
std::size_t> result;
if (n < 16)
{
// 2 = inner node compressed
auto const type = 2U;
auto const vs = size_varint(type);
result.second =
vs +
field<std::uint16_t>::size + // mask
n * 32; // hashes
std::uint8_t* out = reinterpret_cast<
std::uint8_t*>(bf(result.second));
result.first = out;
ostream os(out, result.second);
write<varint>(os, type);
write<std::uint16_t>(os, mask);
write(os, vh.data(), n * 32);
return result;
}
// 3 = full inner node
auto const type = 3U;
auto const vs = size_varint(type);
result.second =
vs +
n * 32; // hashes
std::uint8_t* out = reinterpret_cast<
std::uint8_t*>(bf(result.second));
result.first = out;
ostream os(out, result.second);
write<varint>(os, type);
write(os, vh.data(), n * 32);
return result;
}
}
std::array<std::uint8_t, varint_traits<
std::size_t>::max> vi;
auto const vn = write_varint(
vi.data(), type);
std::pair<void const*, std::size_t> result;
switch(type)
{
case 0: // uncompressed
{
result.second = vn + in_size;
std::uint8_t* p = reinterpret_cast<
std::uint8_t*>(bf(result.second));
result.first = p;
std::memcpy(p, vi.data(), vn);
std::memcpy(p + vn, in, in_size);
break;
}
case 1: // lz4
{
std::uint8_t* p;
auto const lzr = lz4_compress(
in, in_size, [&p, &vn, &bf]
(std::size_t n)
{
p = reinterpret_cast<
std::uint8_t*>(
bf(vn + n));
return p + vn;
});
std::memcpy(p, vi.data(), vn);
result.first = p;
result.second = vn + lzr.second;
break;
}
default:
throw std::logic_error(
"nodeobject codec: unknown=" +
std::to_string(type));
};
return result;
}
} // detail
// Modifies an inner node to erase the ledger
// sequence and type information so the codec
// verification can pass.
//
template <class = void>
void
filter_inner (void* in, std::size_t in_size)
{
using beast::nudb::codec_error;
using namespace beast::nudb::detail;
// Check for inner node
if (in_size == 525)
{
istream is(in, in_size);
std::uint32_t index;
std::uint32_t unused;
std::uint8_t kind;
std::uint32_t prefix;
read<std::uint32_t>(is, index);
read<std::uint32_t>(is, unused);
read<std::uint8_t> (is, kind);
read<std::uint32_t>(is, prefix);
if (prefix == HashPrefix::innerNode)
{
ostream os(in, 9);
write<std::uint32_t>(os, 0);
write<std::uint32_t>(os, 0);
write<std::uint8_t> (os, hotUNKNOWN);
}
}
}
//------------------------------------------------------------------------------
class snappy_codec
{
public:
template <class... Args>
explicit
snappy_codec(Args&&... args)
{
}
char const*
name() const
{
return "snappy";
}
template <class BufferFactory>
std::pair<void const*, std::size_t>
compress (void const* in,
std::size_t in_size, BufferFactory&& bf) const
{
return snappy_compress(in, in_size, bf);
}
template <class BufferFactory>
std::pair<void const*, std::size_t>
decompress (void const* in,
std::size_t in_size, BufferFactory&& bf) const
{
return snappy_decompress(in, in_size, bf);
}
};
class lz4_codec
{
public:
template <class... Args>
explicit
lz4_codec(Args&&... args)
{
}
char const*
name() const
{
return "lz4";
}
template <class BufferFactory>
std::pair<void const*, std::size_t>
decompress (void const* in,
std::size_t in_size, BufferFactory&& bf) const
{
return lz4_compress(in, in_size, bf);
}
template <class BufferFactory>
std::pair<void const*, std::size_t>
compress (void const* in,
std::size_t in_size, BufferFactory&& bf) const
{
return lz4_compress(in, in_size, bf);
}
};
class nodeobject_codec
{
public:
template <class... Args>
explicit
nodeobject_codec(Args&&... args)
{
}
char const*
name() const
{
return "nodeobject";
}
template <class BufferFactory>
std::pair<void const*, std::size_t>
decompress (void const* in,
std::size_t in_size, BufferFactory&& bf) const
{
return detail::nodeobject_decompress(
in, in_size, bf);
}
template <class BufferFactory>
std::pair<void const*, std::size_t>
compress (void const* in,
std::size_t in_size, BufferFactory&& bf) const
{
return detail::nodeobject_compress(
in, in_size, bf);
}
};
} // NodeStore
} // ripple
#endif