Make the block-based table's index pluggable

Summary:
This patch introduced a new table options that allows us to make
block-based table's index pluggable.

To support that new features:

* Code has been refacotred to be more flexible and supports this option well.
* More documentation is added for the existing obsecure functionalities.
* Big surgeon on DataBlockReader(), where the logic was really convoluted.
* Other small code cleanups.

The pluggablility will mostly affect development of internal modules
and won't change frequently, as a result I intentionally avoid
heavy-weight patterns (like factory) and try to make it simple.

Test Plan: make all check

Reviewers: haobo, sdong

Reviewed By: sdong

CC: leveldb

Differential Revision: https://reviews.facebook.net/D16395
This commit is contained in:
kailiu
2014-02-28 18:19:07 -08:00
parent bf86af5174
commit 74939a9e13
7 changed files with 777 additions and 548 deletions

View File

@@ -11,23 +11,29 @@
#include <assert.h>
#include <inttypes.h>
#include <map>
#include <stdio.h>
#include "rocksdb/flush_block_policy.h"
#include <map>
#include <memory>
#include "db/dbformat.h"
#include "rocksdb/cache.h"
#include "rocksdb/comparator.h"
#include "table/table_builder.h"
#include "rocksdb/env.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/flush_block_policy.h"
#include "rocksdb/options.h"
#include "db/dbformat.h"
#include "table/block_based_table_reader.h"
#include "rocksdb/table.h"
#include "table/block.h"
#include "table/block_based_table_reader.h"
#include "table/block_builder.h"
#include "table/filter_block.h"
#include "table/format.h"
#include "table/meta_blocks.h"
#include "table/table_builder.h"
#include "util/coding.h"
#include "util/crc32c.h"
#include "util/stop_watch.h"
@@ -36,11 +42,167 @@ namespace rocksdb {
namespace {
static bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
typedef BlockBasedTableOptions::IndexType IndexType;
// The interface for building index.
// Instruction for adding a new concrete IndexBuilder:
// 1. Create a subclass instantiated from IndexBuilder.
// 2. Add a new entry associated with that subclass in TableOptions::IndexType.
// 3. Add a create function for the new subclass in CreateIndexBuilder.
// Note: we can devise more advanced design to simplify the process for adding
// new subclass, which will, on the other hand, increase the code complexity and
// catch unwanted attention from readers. Given that we won't add/change
// indexes frequently, it makes sense to just embrace a more straightforward
// design that just works.
class IndexBuilder {
public:
explicit IndexBuilder(const Comparator* comparator)
: comparator_(comparator) {}
virtual ~IndexBuilder() {}
// Add a new index entry to index block.
// To allow further optimization, we provide `last_key_in_current_block` and
// `first_key_in_next_block`, based on which the specific implementation can
// determine the best index key to be used for the index block.
// @last_key_in_current_block: this parameter maybe overridden with the value
// "substitute key".
// @first_key_in_next_block: it will be nullptr if the entry being added is
// the last one in the table
//
// REQUIRES: Finish() has not yet been called.
virtual void AddEntry(std::string* last_key_in_current_block,
const Slice* first_key_in_next_block,
const BlockHandle& block_handle) = 0;
// Inform the index builder that all entries has been written. Block builder
// may therefore perform any operation required for block finalization.
//
// REQUIRES: Finish() has not yet been called.
virtual Slice Finish() = 0;
// Get the estimated size for index block.
virtual size_t EstimatedSize() const = 0;
protected:
const Comparator* comparator_;
};
// This index builder builds space-efficient index block for binary-search-based
// index.
//
// Optimizations:
// 1. Made block's `block_restart_interval` to be 1, which will avoid linear
// search when doing index lookup.
// 2. Shorten the key length for index block. Other than honestly using the
// last key in the data block as the index key, we instead find a shortest
// substitute key that serves the same function.
class BinarySearchIndexBuilder : public IndexBuilder {
public:
explicit BinarySearchIndexBuilder(const Comparator* comparator)
: IndexBuilder(comparator),
index_block_builder_(1 /* block_restart_interval == 1 */, comparator) {}
virtual void AddEntry(std::string* last_key_in_current_block,
const Slice* first_key_in_next_block,
const BlockHandle& block_handle) override {
if (first_key_in_next_block != nullptr) {
comparator_->FindShortestSeparator(last_key_in_current_block,
*first_key_in_next_block);
} else {
comparator_->FindShortSuccessor(last_key_in_current_block);
}
std::string handle_encoding;
block_handle.EncodeTo(&handle_encoding);
index_block_builder_.Add(*last_key_in_current_block, handle_encoding);
}
virtual Slice Finish() override { return index_block_builder_.Finish(); }
virtual size_t EstimatedSize() const {
return index_block_builder_.CurrentSizeEstimate();
}
private:
BlockBuilder index_block_builder_;
};
// Create a index builder based on its type.
IndexBuilder* CreateIndexBuilder(IndexType type, const Comparator* comparator) {
switch (type) {
case BlockBasedTableOptions::kBinarySearch: {
return new BinarySearchIndexBuilder(comparator);
}
default: {
assert(!"Do not recognize the index type ");
return nullptr;
}
}
// impossible.
assert(false);
return nullptr;
}
bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
// Check to see if compressed less than 12.5%
return compressed_size < raw_size - (raw_size / 8u);
}
Slice CompressBlock(const Slice& raw,
const CompressionOptions& compression_options,
CompressionType* type, std::string* compressed_output) {
if (*type == kNoCompression) {
return raw;
}
// Will return compressed block contents if (1) the compression method is
// supported in this platform and (2) the compression rate is "good enough".
switch (*type) {
case kSnappyCompression:
if (port::Snappy_Compress(compression_options, raw.data(), raw.size(),
compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output;
}
break; // fall back to no compression.
case kZlibCompression:
if (port::Zlib_Compress(compression_options, raw.data(), raw.size(),
compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output;
}
break; // fall back to no compression.
case kBZip2Compression:
if (port::BZip2_Compress(compression_options, raw.data(), raw.size(),
compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output;
}
break; // fall back to no compression.
case kLZ4Compression:
if (port::LZ4_Compress(compression_options, raw.data(), raw.size(),
compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output;
}
break; // fall back to no compression.
case kLZ4HCCompression:
if (port::LZ4HC_Compress(compression_options, raw.data(), raw.size(),
compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output;
}
break; // fall back to no compression.
default: {} // Do not recognize this compression type
}
// Compression method is not supported, or not good compression ratio, so just
// fall back to uncompressed form.
*type = kNoCompression;
return raw;
}
} // anonymous namespace
// kBlockBasedTableMagicNumber was picked by running
@@ -51,6 +213,46 @@ static bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
extern const uint64_t kBlockBasedTableMagicNumber
= 0xdb4775248b80fb57ull;
// A collector that collects properties of interest to block-based table.
// For now this class looks heavy-weight since we only write one additional
// property.
// But in the forseeable future, we will add more and more properties that are
// specific to block-based table.
class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector
: public TablePropertiesCollector {
public:
BlockBasedTablePropertiesCollector(
BlockBasedTableOptions::IndexType index_type)
: index_type_(index_type) {}
virtual Status Add(const Slice& key, const Slice& value) {
// Intentionally left blank. Have no interest in collecting stats for
// individual key/value pairs.
return Status::OK();
}
virtual Status Finish(UserCollectedProperties* properties) {
std::string val;
PutFixed32(&val, static_cast<uint32_t>(index_type_));
properties->insert({BlockBasedTablePropertyNames::kIndexType, val});
return Status::OK();
}
// The name of the properties collector can be used for debugging purpose.
virtual const char* Name() const {
return "BlockBasedTablePropertiesCollector";
}
virtual UserCollectedProperties GetReadableProperties() const {
// Intentionally left blank.
return UserCollectedProperties();
}
private:
BlockBasedTableOptions::IndexType index_type_;
};
struct BlockBasedTableBuilder::Rep {
Options options;
const InternalKeyComparator& internal_comparator;
@@ -58,7 +260,8 @@ struct BlockBasedTableBuilder::Rep {
uint64_t offset = 0;
Status status;
BlockBuilder data_block;
BlockBuilder index_block;
std::unique_ptr<IndexBuilder> index_builder;
std::string last_key;
CompressionType compression_type;
TableProperties props;
@@ -75,28 +278,31 @@ struct BlockBasedTableBuilder::Rep {
Rep(const Options& opt, const InternalKeyComparator& icomparator,
WritableFile* f, FlushBlockPolicyFactory* flush_block_policy_factory,
CompressionType compression_type)
CompressionType compression_type, IndexType index_block_type)
: options(opt),
internal_comparator(icomparator),
file(f),
data_block(options, &internal_comparator),
// To avoid linear scan, we make the block_restart_interval to be `1`
// in index block builder
index_block(1 /* block_restart_interval */, &internal_comparator),
index_builder(
CreateIndexBuilder(index_block_type, &internal_comparator)),
compression_type(compression_type),
filter_block(opt.filter_policy == nullptr
? nullptr
: new FilterBlockBuilder(opt, &internal_comparator)),
flush_block_policy(flush_block_policy_factory->NewFlushBlockPolicy(
options, data_block)) {}
options, data_block)) {
options.table_properties_collectors.push_back(
std::make_shared<BlockBasedTablePropertiesCollector>(index_block_type));
}
};
BlockBasedTableBuilder::BlockBasedTableBuilder(
const Options& options, const InternalKeyComparator& internal_comparator,
WritableFile* file, FlushBlockPolicyFactory* flush_block_policy_factory,
const Options& options, const BlockBasedTableOptions& table_options,
const InternalKeyComparator& internal_comparator, WritableFile* file,
CompressionType compression_type)
: rep_(new Rep(options, internal_comparator, file,
flush_block_policy_factory, compression_type)) {
table_options.flush_block_policy_factory.get(),
compression_type, table_options.index_type)) {
if (rep_->filter_block != nullptr) {
rep_->filter_block->StartBlock(0);
}
@@ -136,10 +342,7 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
// entries in the first block and < all entries in subsequent
// blocks.
if (ok()) {
r->internal_comparator.FindShortestSeparator(&r->last_key, key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, Slice(handle_encoding));
r->index_builder->AddEntry(&r->last_key, &key, r->pending_handle);
}
}
@@ -179,88 +382,25 @@ void BlockBasedTableBuilder::Flush() {
void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block,
BlockHandle* handle) {
WriteBlock(block->Finish(), handle);
block->Reset();
}
void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
BlockHandle* handle) {
// File format contains a sequence of blocks where each block has:
// block_data: uint8[n]
// type: uint8
// crc: uint32
assert(ok());
Rep* r = rep_;
Slice raw = block->Finish();
Slice block_contents;
std::string* compressed = &r->compressed_output;
CompressionType type = r->compression_type;
switch (type) {
case kNoCompression:
block_contents = raw;
break;
case kSnappyCompression: {
std::string* compressed = &r->compressed_output;
if (port::Snappy_Compress(r->options.compression_opts, raw.data(),
raw.size(), compressed) &&
GoodCompressionRatio(compressed->size(), raw.size())) {
block_contents = *compressed;
} else {
// Snappy not supported, or not good compression ratio, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
}
case kZlibCompression:
if (port::Zlib_Compress(r->options.compression_opts, raw.data(),
raw.size(), compressed) &&
GoodCompressionRatio(compressed->size(), raw.size())) {
block_contents = *compressed;
} else {
// Zlib not supported, or not good compression ratio, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
case kBZip2Compression:
if (port::BZip2_Compress(r->options.compression_opts, raw.data(),
raw.size(), compressed) &&
GoodCompressionRatio(compressed->size(), raw.size())) {
block_contents = *compressed;
} else {
// BZip not supported, or not good compression ratio, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
case kLZ4Compression:
if (port::LZ4_Compress(r->options.compression_opts, raw.data(),
raw.size(), compressed) &&
GoodCompressionRatio(compressed->size(), raw.size())) {
block_contents = *compressed;
} else {
// LZ4 not supported, or not good compression ratio, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
case kLZ4HCCompression:
if (port::LZ4HC_Compress(r->options.compression_opts, raw.data(),
raw.size(), compressed) &&
GoodCompressionRatio(compressed->size(), raw.size())) {
block_contents = *compressed;
} else {
// LZ4 not supported, or not good compression ratio, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
}
auto type = r->compression_type;
auto block_contents =
CompressBlock(raw_block_contents, r->options.compression_opts, &type,
&r->compressed_output);
WriteRawBlock(block_contents, type, handle);
r->compressed_output.clear();
block->Reset();
}
void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
@@ -364,11 +504,8 @@ Status BlockBasedTableBuilder::Finish() {
// block, we will finish writing all index entries here and flush them
// to storage after metaindex block is written.
if (ok() && !empty_data_block) {
r->internal_comparator.FindShortSuccessor(&r->last_key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, handle_encoding);
r->index_builder->AddEntry(&r->last_key, nullptr /* no next data block */,
r->pending_handle);
}
// Write meta blocks and metaindex block with the following order.
@@ -394,11 +531,12 @@ Status BlockBasedTableBuilder::Finish() {
r->props.filter_policy_name = r->options.filter_policy != nullptr ?
r->options.filter_policy->Name() : "";
r->props.index_size =
r->index_block.CurrentSizeEstimate() + kBlockTrailerSize;
r->index_builder->EstimatedSize() + kBlockTrailerSize;
// Add basic properties
property_block_builder.AddTableProperty(r->props);
// Add use collected properties
NotifyCollectTableCollectorsOnFinish(
r->options.table_properties_collectors,
r->options.info_log.get(),
@@ -425,7 +563,7 @@ Status BlockBasedTableBuilder::Finish() {
// Write index block
if (ok()) {
WriteBlock(&r->index_block, &index_block_handle);
WriteBlock(r->index_builder->Finish(), &index_block_handle);
}
// Write footer