Provide mechanism to configure when to flush the block

Summary: Allow block based table to configure the way flushing the blocks. This feature will allow us to add support for prefix-aligned block.

Test Plan: make check

Reviewers: dhruba, haobo, sdong, igor

Reviewed By: sdong

CC: leveldb

Differential Revision: https://reviews.facebook.net/D13875
This commit is contained in:
Kai Liu
2013-11-07 21:27:21 -08:00
parent bba6595b1f
commit fd075d6edd
8 changed files with 230 additions and 64 deletions

View File

@@ -12,6 +12,7 @@
#include <assert.h>
#include <map>
#include "rocksdb/flush_block_policy.h"
#include "rocksdb/cache.h"
#include "rocksdb/comparator.h"
#include "rocksdb/table.h"
@@ -96,19 +97,11 @@ struct BlockBasedTableBuilder::Rep {
char compressed_cache_key_prefix[BlockBasedTable::kMaxCacheKeyPrefixSize];
size_t compressed_cache_key_prefix_size;
// We do not emit the index entry for a block until we have seen the
// first key for the next data block. This allows us to use shorter
// keys in the index block. For example, consider a block boundary
// between the keys "the quick brown fox" and "the who". We can use
// "the r" as the key for the index block entry since it is >= all
// entries in the first block and < all entries in subsequent
// blocks.
//
// Invariant: r->pending_index_entry is true only if data_block is empty.
bool pending_index_entry;
BlockHandle pending_handle; // Handle to add to index block
std::string compressed_output;
std::unique_ptr<FlushBlockPolicy> flush_block_policy;
Rep(const Options& opt, WritableFile* f, CompressionType compression_type)
: options(opt),
@@ -118,8 +111,10 @@ struct BlockBasedTableBuilder::Rep {
index_block(1, index_block_options.comparator),
compression_type(compression_type),
filter_block(opt.filter_policy == nullptr ? nullptr
: new FilterBlockBuilder(opt)),
pending_index_entry(false) {
: new FilterBlockBuilder(opt)) {
assert(options.flush_block_policy_factory);
auto factory = options.flush_block_policy_factory;
flush_block_policy.reset(factory->NewFlushBlockPolicy(data_block));
}
};
@@ -151,29 +146,25 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
}
const size_t curr_size = r->data_block.CurrentSizeEstimate();
const size_t estimated_size_after = r->data_block.EstimateSizeAfterKV(key,
value);
// Do flush if one of the below two conditions is true:
// 1) if the current estimated size already exceeds the block size,
// 2) block_size_deviation is set and the estimated size after appending
// the kv will exceed the block size and the current size is under the
// the deviation.
if (curr_size >= r->options.block_size ||
(estimated_size_after > r->options.block_size &&
r->options.block_size_deviation > 0 &&
(curr_size * 100) >
r->options.block_size * (100 - r->options.block_size_deviation))) {
auto should_flush = r->flush_block_policy->Update(key, value);
if (should_flush) {
assert(!r->data_block.empty());
Flush();
}
if (r->pending_index_entry) {
assert(r->data_block.empty());
r->options.comparator->FindShortestSeparator(&r->last_key, key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, Slice(handle_encoding));
r->pending_index_entry = false;
// Add item to index block.
// We do not emit the index entry for a block until we have seen the
// first key for the next data block. This allows us to use shorter
// keys in the index block. For example, consider a block boundary
// between the keys "the quick brown fox" and "the who". We can use
// "the r" as the key for the index block entry since it is >= all
// entries in the first block and < all entries in subsequent
// blocks.
if (ok()) {
r->options.comparator->FindShortestSeparator(&r->last_key, key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, Slice(handle_encoding));
}
}
if (r->filter_block != nullptr) {
@@ -203,10 +194,8 @@ void BlockBasedTableBuilder::Flush() {
assert(!r->closed);
if (!ok()) return;
if (r->data_block.empty()) return;
assert(!r->pending_index_entry);
WriteBlock(&r->data_block, &r->pending_handle);
if (ok()) {
r->pending_index_entry = true;
r->status = r->file->Flush();
}
if (r->filter_block != nullptr) {
@@ -358,11 +347,14 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
Status BlockBasedTableBuilder::Finish() {
Rep* r = rep_;
bool empty_data_block = r->data_block.empty();
Flush();
assert(!r->closed);
r->closed = true;
BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle;
BlockHandle filter_block_handle,
metaindex_block_handle,
index_block_handle;
// Write filter block
if (ok() && r->filter_block != nullptr) {
@@ -373,12 +365,12 @@ Status BlockBasedTableBuilder::Finish() {
// To make sure stats block is able to keep the accurate size of index
// block, we will finish writing all index entries here and flush them
// to storage after metaindex block is written.
if (ok() && (r->pending_index_entry)) {
r->options.comparator->FindShortSuccessor(&r->last_key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, Slice(handle_encoding));
r->pending_index_entry = false;
if (ok() && !empty_data_block) {
r->options.comparator->FindShortSuccessor(&r->last_key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, handle_encoding);
}
// Write meta blocks and metaindex block with the following order.

View File

@@ -0,0 +1,65 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "rocksdb/flush_block_policy.h"
#include "rocksdb/slice.h"
#include "table/block_builder.h"
#include <cassert>
namespace rocksdb {
// Flush block by size
class FlushBlockBySizePolicy : public FlushBlockPolicy {
public:
// @params block_size: Approximate size of user data packed per
// block.
// @params block_size_deviation: This is used to close a block before it
// reaches the configured
FlushBlockBySizePolicy(const uint64_t block_size,
const uint64_t block_size_deviation,
const BlockBuilder& data_block_builder) :
block_size_(block_size),
block_size_deviation_(block_size_deviation),
data_block_builder_(data_block_builder) {
}
virtual bool Update(const Slice& key,
const Slice& value) override {
auto curr_size = data_block_builder_.CurrentSizeEstimate();
// Do flush if one of the below two conditions is true:
// 1) if the current estimated size already exceeds the block size,
// 2) block_size_deviation is set and the estimated size after appending
// the kv will exceed the block size and the current size is under the
// the deviation.
return curr_size >= block_size_ || BlockAlmostFull(key, value);
}
private:
bool BlockAlmostFull(const Slice& key, const Slice& value) const {
const auto curr_size = data_block_builder_.CurrentSizeEstimate();
const auto estimated_size_after =
data_block_builder_.EstimateSizeAfterKV(key, value);
return
estimated_size_after > block_size_ &&
block_size_deviation_ > 0 &&
curr_size * 100 > block_size_ * (100 - block_size_deviation_);
}
const uint64_t block_size_;
const uint64_t block_size_deviation_;
const BlockBuilder& data_block_builder_;
};
FlushBlockPolicy* FlushBlockBySizePolicyFactory::NewFlushBlockPolicy(
const BlockBuilder& data_block_builder) const {
return new FlushBlockBySizePolicy(block_size_,
block_size_deviation_,
data_block_builder);
}
} // namespace rocksdb

View File

@@ -30,6 +30,7 @@
namespace rocksdb {
namespace {
// Return reverse of "key".
// Used to test non-lexicographic comparators.
static std::string Reverse(const Slice& key) {
@@ -42,7 +43,12 @@ static std::string Reverse(const Slice& key) {
return rev;
}
namespace {
static Options GetDefaultOptions() {
Options options;
options.SetUpDefaultFlushBlockPolicyFactory();
return options;
}
class ReverseKeyComparator : public Comparator {
public:
virtual const char* Name() const {
@@ -423,7 +429,7 @@ class DBConstructor: public Constructor {
void NewDB() {
std::string name = test::TmpDir() + "/table_testdb";
Options options;
Options options = GetDefaultOptions();
options.comparator = comparator_;
Status status = DestroyDB(name, options);
ASSERT_TRUE(status.ok()) << status.ToString();
@@ -442,14 +448,16 @@ class DBConstructor: public Constructor {
static bool SnappyCompressionSupported() {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::Snappy_Compress(Options().compression_opts, in.data(), in.size(),
return port::Snappy_Compress(GetDefaultOptions().compression_opts,
in.data(), in.size(),
&out);
}
static bool ZlibCompressionSupported() {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::Zlib_Compress(Options().compression_opts, in.data(), in.size(),
return port::Zlib_Compress(GetDefaultOptions().compression_opts,
in.data(), in.size(),
&out);
}
@@ -457,7 +465,8 @@ static bool ZlibCompressionSupported() {
static bool BZip2CompressionSupported() {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::BZip2_Compress(Options().compression_opts, in.data(), in.size(),
return port::BZip2_Compress(GetDefaultOptions().compression_opts,
in.data(), in.size(),
&out);
}
#endif
@@ -527,7 +536,7 @@ class Harness {
void Init(const TestArgs& args) {
delete constructor_;
constructor_ = nullptr;
options_ = Options();
options_ = GetDefaultOptions();
options_.block_restart_interval = args.restart_interval;
options_.compression = args.compression;
@@ -727,7 +736,7 @@ class Harness {
DB* db() const { return constructor_->db(); }
private:
Options options_;
Options options_ = GetDefaultOptions();
Constructor* constructor_;
};
@@ -827,7 +836,7 @@ TEST(MemTableTest, Simple) {
MemTable* memtable = new MemTable(cmp, table_factory);
memtable->Ref();
WriteBatch batch;
Options options;
Options options = GetDefaultOptions();
WriteBatchInternal::SetSequence(&batch, 100);
batch.Put(std::string("k1"), std::string("v1"));
batch.Put(std::string("k2"), std::string("v2"));
@@ -878,7 +887,7 @@ TEST(TableTest, BasicTableStats) {
std::vector<std::string> keys;
KVMap kvmap;
Options options;
Options options = GetDefaultOptions();
options.compression = kNoCompression;
options.block_restart_interval = 1;
@@ -912,7 +921,7 @@ TEST(TableTest, FilterPolicyNameStats) {
c.Add("a1", "val1");
std::vector<std::string> keys;
KVMap kvmap;
Options options;
Options options = GetDefaultOptions();
std::unique_ptr<const FilterPolicy> filter_policy(
NewBloomFilterPolicy(10)
);
@@ -955,7 +964,7 @@ TEST(TableTest, IndexSizeStat) {
std::vector<std::string> ks;
KVMap kvmap;
Options options;
Options options = GetDefaultOptions();
options.compression = kNoCompression;
options.block_restart_interval = 1;
@@ -974,6 +983,9 @@ TEST(TableTest, NumBlockStat) {
options.compression = kNoCompression;
options.block_restart_interval = 1;
options.block_size = 1000;
// Block Size changed, need to set up a new flush policy to reflect the
// change.
options.SetUpDefaultFlushBlockPolicyFactory();
for (int i = 0; i < 10; ++i) {
// the key/val are slightly smaller than block size, so that each block
@@ -1001,7 +1013,7 @@ TEST(TableTest, ApproximateOffsetOfPlain) {
c.Add("k07", std::string(100000, 'x'));
std::vector<std::string> keys;
KVMap kvmap;
Options options;
Options options = GetDefaultOptions();
options.block_size = 1024;
options.compression = kNoCompression;
c.Finish(options, &keys, &kvmap);
@@ -1030,7 +1042,7 @@ static void Do_Compression_Test(CompressionType comp) {
c.Add("k04", test::CompressibleString(&rnd, 0.25, 10000, &tmp));
std::vector<std::string> keys;
KVMap kvmap;
Options options;
Options options = GetDefaultOptions();
options.block_size = 1024;
options.compression = comp;
c.Finish(options, &keys, &kvmap);
@@ -1072,7 +1084,7 @@ TEST(TableTest, BlockCacheLeak) {
// in the cache. This test checks whether the Table actually makes use of the
// unique ID from the file.
Options opt;
Options opt = GetDefaultOptions();
opt.block_size = 1024;
opt.compression = kNoCompression;
opt.block_cache = NewLRUCache(16*1024*1024); // big enough so we don't ever