mirror of
https://github.com/XRPLF/rippled.git
synced 2025-12-06 17:27:55 +00:00
Make "Table" pluggable
Summary: This patch makes Table and TableBuilder a abstract class and make all the implementation of the current table into BlockedBasedTable and BlockedBasedTable Builder. Test Plan: Make db_test.cc to work with block based table. Add a new test simple_table_db_test.cc where a different simple table format is implemented. Reviewers: dhruba, haobo, kailiu, emayanke, vamsi Reviewed By: dhruba CC: leveldb Differential Revision: https://reviews.facebook.net/D13521
This commit is contained in:
470
table/block_based_table_builder.cc
Normal file
470
table/block_based_table_builder.cc
Normal file
@@ -0,0 +1,470 @@
|
||||
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
//
|
||||
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
|
||||
#include "table/block_based_table_builder.h"
|
||||
|
||||
#include <assert.h>
|
||||
#include <map>
|
||||
|
||||
#include "rocksdb/comparator.h"
|
||||
#include "rocksdb/table.h"
|
||||
#include "rocksdb/env.h"
|
||||
#include "rocksdb/filter_policy.h"
|
||||
#include "rocksdb/options.h"
|
||||
#include "table/block_based_table.h"
|
||||
#include "table/block_builder.h"
|
||||
#include "table/filter_block.h"
|
||||
#include "table/format.h"
|
||||
#include "util/coding.h"
|
||||
#include "util/crc32c.h"
|
||||
#include "util/stop_watch.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
namespace {
|
||||
|
||||
struct BytewiseLessThan {
|
||||
bool operator()(const std::string& key1, const std::string& key2) {
|
||||
// smaller entries will be placed in front.
|
||||
return comparator->Compare(key1, key2) <= 0;
|
||||
}
|
||||
const Comparator* comparator = BytewiseComparator();
|
||||
};
|
||||
|
||||
// When writing to a block that requires entries to be sorted by
|
||||
// `BytewiseComparator`, we can buffer the content to `BytewiseSortedMap`
|
||||
// before writng to store.
|
||||
typedef std::map<std::string, std::string, BytewiseLessThan> BytewiseSortedMap;
|
||||
|
||||
void AddStats(BytewiseSortedMap& stats, std::string name, uint64_t val) {
|
||||
assert(stats.find(name) == stats.end());
|
||||
|
||||
std::string dst;
|
||||
PutVarint64(&dst, val);
|
||||
|
||||
stats.insert(
|
||||
std::make_pair(name, dst)
|
||||
);
|
||||
}
|
||||
|
||||
static bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
|
||||
// Check to see if compressed less than 12.5%
|
||||
return compressed_size < raw_size - (raw_size / 8u);
|
||||
}
|
||||
|
||||
// Were we encounter any error occurs during user-defined statistics collection,
|
||||
// we'll write the warning message to info log.
|
||||
void LogStatsCollectionError(
|
||||
Logger* info_log, const std::string& method, const std::string& name) {
|
||||
assert(method == "Add" || method == "Finish");
|
||||
|
||||
std::string msg =
|
||||
"[Warning] encountered error when calling TableStatsCollector::" +
|
||||
method + "() with collector name: " + name;
|
||||
Log(info_log, msg.c_str());
|
||||
}
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
struct BlockBasedTableBuilder::Rep {
|
||||
Options options;
|
||||
Options index_block_options;
|
||||
WritableFile* file;
|
||||
uint64_t offset = 0;
|
||||
Status status;
|
||||
BlockBuilder data_block;
|
||||
BlockBuilder index_block;
|
||||
std::string last_key;
|
||||
// Whether enable compression in this table.
|
||||
bool enable_compression;
|
||||
|
||||
uint64_t num_entries = 0;
|
||||
uint64_t num_data_blocks = 0;
|
||||
uint64_t raw_key_size = 0;
|
||||
uint64_t raw_value_size = 0;
|
||||
uint64_t data_size = 0;
|
||||
|
||||
bool closed = false; // Either Finish() or Abandon() has been called.
|
||||
FilterBlockBuilder* filter_block;
|
||||
|
||||
// We do not emit the index entry for a block until we have seen the
|
||||
// first key for the next data block. This allows us to use shorter
|
||||
// keys in the index block. For example, consider a block boundary
|
||||
// between the keys "the quick brown fox" and "the who". We can use
|
||||
// "the r" as the key for the index block entry since it is >= all
|
||||
// entries in the first block and < all entries in subsequent
|
||||
// blocks.
|
||||
//
|
||||
// Invariant: r->pending_index_entry is true only if data_block is empty.
|
||||
bool pending_index_entry;
|
||||
BlockHandle pending_handle; // Handle to add to index block
|
||||
|
||||
std::string compressed_output;
|
||||
|
||||
Rep(const Options& opt, WritableFile* f, bool enable_compression)
|
||||
: options(opt),
|
||||
index_block_options(opt),
|
||||
file(f),
|
||||
data_block(&options),
|
||||
index_block(1, index_block_options.comparator),
|
||||
enable_compression(enable_compression),
|
||||
filter_block(opt.filter_policy == nullptr ? nullptr
|
||||
: new FilterBlockBuilder(opt)),
|
||||
pending_index_entry(false) {
|
||||
}
|
||||
};
|
||||
|
||||
BlockBasedTableBuilder::BlockBasedTableBuilder(const Options& options,
|
||||
WritableFile* file, int level,
|
||||
const bool enable_compression)
|
||||
: TableBuilder(level), rep_(new Rep(options, file, enable_compression)) {
|
||||
if (rep_->filter_block != nullptr) {
|
||||
rep_->filter_block->StartBlock(0);
|
||||
}
|
||||
}
|
||||
|
||||
BlockBasedTableBuilder::~BlockBasedTableBuilder() {
|
||||
assert(rep_->closed); // Catch errors where caller forgot to call Finish()
|
||||
delete rep_->filter_block;
|
||||
delete rep_;
|
||||
}
|
||||
|
||||
void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
|
||||
Rep* r = rep_;
|
||||
assert(!r->closed);
|
||||
if (!ok()) return;
|
||||
if (r->num_entries > 0) {
|
||||
assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
|
||||
}
|
||||
|
||||
const size_t curr_size = r->data_block.CurrentSizeEstimate();
|
||||
const size_t estimated_size_after = r->data_block.EstimateSizeAfterKV(key,
|
||||
value);
|
||||
// Do flush if one of the below two conditions is true:
|
||||
// 1) if the current estimated size already exceeds the block size,
|
||||
// 2) block_size_deviation is set and the estimated size after appending
|
||||
// the kv will exceed the block size and the current size is under the
|
||||
// the deviation.
|
||||
if (curr_size >= r->options.block_size ||
|
||||
(estimated_size_after > r->options.block_size &&
|
||||
r->options.block_size_deviation > 0 &&
|
||||
(curr_size * 100) >
|
||||
r->options.block_size * (100 - r->options.block_size_deviation))) {
|
||||
Flush();
|
||||
}
|
||||
|
||||
if (r->pending_index_entry) {
|
||||
assert(r->data_block.empty());
|
||||
r->options.comparator->FindShortestSeparator(&r->last_key, key);
|
||||
std::string handle_encoding;
|
||||
r->pending_handle.EncodeTo(&handle_encoding);
|
||||
r->index_block.Add(r->last_key, Slice(handle_encoding));
|
||||
r->pending_index_entry = false;
|
||||
}
|
||||
|
||||
if (r->filter_block != nullptr) {
|
||||
r->filter_block->AddKey(key);
|
||||
}
|
||||
|
||||
r->last_key.assign(key.data(), key.size());
|
||||
r->data_block.Add(key, value);
|
||||
r->num_entries++;
|
||||
r->raw_key_size += key.size();
|
||||
r->raw_value_size += value.size();
|
||||
|
||||
for (auto collector : r->options.table_stats_collectors) {
|
||||
Status s = collector->Add(key, value);
|
||||
if (!s.ok()) {
|
||||
LogStatsCollectionError(
|
||||
r->options.info_log.get(),
|
||||
"Add", /* method */
|
||||
collector->Name()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void BlockBasedTableBuilder::Flush() {
|
||||
Rep* r = rep_;
|
||||
assert(!r->closed);
|
||||
if (!ok()) return;
|
||||
if (r->data_block.empty()) return;
|
||||
assert(!r->pending_index_entry);
|
||||
WriteBlock(&r->data_block, &r->pending_handle);
|
||||
if (ok()) {
|
||||
r->pending_index_entry = true;
|
||||
r->status = r->file->Flush();
|
||||
}
|
||||
if (r->filter_block != nullptr) {
|
||||
r->filter_block->StartBlock(r->offset);
|
||||
}
|
||||
r->data_size = r->offset;
|
||||
++r->num_data_blocks;
|
||||
}
|
||||
|
||||
void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block,
|
||||
BlockHandle* handle) {
|
||||
// File format contains a sequence of blocks where each block has:
|
||||
// block_data: uint8[n]
|
||||
// type: uint8
|
||||
// crc: uint32
|
||||
assert(ok());
|
||||
Rep* r = rep_;
|
||||
Slice raw = block->Finish();
|
||||
|
||||
Slice block_contents;
|
||||
std::string* compressed = &r->compressed_output;
|
||||
CompressionType type;
|
||||
if (!r->enable_compression) {
|
||||
// disable compression
|
||||
type = kNoCompression;
|
||||
} else {
|
||||
// If the use has specified a different compression level for each level,
|
||||
// then pick the compresison for that level.
|
||||
if (!r->options.compression_per_level.empty()) {
|
||||
const int n = r->options.compression_per_level.size();
|
||||
// It is possible for level_ to be -1; in that case, we use level
|
||||
// 0's compression. This occurs mostly in backwards compatibility
|
||||
// situations when the builder doesn't know what level the file
|
||||
// belongs to. Likewise, if level_ is beyond the end of the
|
||||
// specified compression levels, use the last value.
|
||||
type = r->options.compression_per_level[std::max(0,
|
||||
std::min(level_, n))];
|
||||
} else {
|
||||
type = r->options.compression;
|
||||
}
|
||||
}
|
||||
switch (type) {
|
||||
case kNoCompression:
|
||||
block_contents = raw;
|
||||
break;
|
||||
|
||||
case kSnappyCompression: {
|
||||
std::string* compressed = &r->compressed_output;
|
||||
if (port::Snappy_Compress(r->options.compression_opts, raw.data(),
|
||||
raw.size(), compressed) &&
|
||||
GoodCompressionRatio(compressed->size(), raw.size())) {
|
||||
block_contents = *compressed;
|
||||
} else {
|
||||
// Snappy not supported, or not good compression ratio, so just
|
||||
// store uncompressed form
|
||||
block_contents = raw;
|
||||
type = kNoCompression;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case kZlibCompression:
|
||||
if (port::Zlib_Compress(r->options.compression_opts, raw.data(),
|
||||
raw.size(), compressed) &&
|
||||
GoodCompressionRatio(compressed->size(), raw.size())) {
|
||||
block_contents = *compressed;
|
||||
} else {
|
||||
// Zlib not supported, or not good compression ratio, so just
|
||||
// store uncompressed form
|
||||
block_contents = raw;
|
||||
type = kNoCompression;
|
||||
}
|
||||
break;
|
||||
case kBZip2Compression:
|
||||
if (port::BZip2_Compress(r->options.compression_opts, raw.data(),
|
||||
raw.size(), compressed) &&
|
||||
GoodCompressionRatio(compressed->size(), raw.size())) {
|
||||
block_contents = *compressed;
|
||||
} else {
|
||||
// BZip not supported, or not good compression ratio, so just
|
||||
// store uncompressed form
|
||||
block_contents = raw;
|
||||
type = kNoCompression;
|
||||
}
|
||||
break;
|
||||
}
|
||||
WriteRawBlock(block_contents, type, handle);
|
||||
r->compressed_output.clear();
|
||||
block->Reset();
|
||||
}
|
||||
|
||||
void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
|
||||
CompressionType type,
|
||||
BlockHandle* handle) {
|
||||
Rep* r = rep_;
|
||||
StopWatch sw(r->options.env, r->options.statistics, WRITE_RAW_BLOCK_MICROS);
|
||||
handle->set_offset(r->offset);
|
||||
handle->set_size(block_contents.size());
|
||||
r->status = r->file->Append(block_contents);
|
||||
if (r->status.ok()) {
|
||||
char trailer[kBlockTrailerSize];
|
||||
trailer[0] = type;
|
||||
uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size());
|
||||
crc = crc32c::Extend(crc, trailer, 1); // Extend crc to cover block type
|
||||
EncodeFixed32(trailer+1, crc32c::Mask(crc));
|
||||
r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
|
||||
if (r->status.ok()) {
|
||||
r->offset += block_contents.size() + kBlockTrailerSize;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Status BlockBasedTableBuilder::status() const {
|
||||
return rep_->status;
|
||||
}
|
||||
|
||||
Status BlockBasedTableBuilder::Finish() {
|
||||
Rep* r = rep_;
|
||||
Flush();
|
||||
assert(!r->closed);
|
||||
r->closed = true;
|
||||
|
||||
BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle;
|
||||
|
||||
// Write filter block
|
||||
if (ok() && r->filter_block != nullptr) {
|
||||
WriteRawBlock(r->filter_block->Finish(), kNoCompression,
|
||||
&filter_block_handle);
|
||||
}
|
||||
|
||||
// To make sure stats block is able to keep the accurate size of index
|
||||
// block, we will finish writing all index entries here and flush them
|
||||
// to storage after metaindex block is written.
|
||||
if (ok() && (r->pending_index_entry)) {
|
||||
r->options.comparator->FindShortSuccessor(&r->last_key);
|
||||
std::string handle_encoding;
|
||||
r->pending_handle.EncodeTo(&handle_encoding);
|
||||
r->index_block.Add(r->last_key, Slice(handle_encoding));
|
||||
r->pending_index_entry = false;
|
||||
}
|
||||
|
||||
// Write meta blocks and metaindex block with the following order.
|
||||
// 1. [meta block: filter]
|
||||
// 2. [meta block: stats]
|
||||
// 3. [metaindex block]
|
||||
if (ok()) {
|
||||
// We use `BytewiseComparator` as the comparator for meta block.
|
||||
BlockBuilder meta_index_block(
|
||||
r->options.block_restart_interval,
|
||||
BytewiseComparator()
|
||||
);
|
||||
// Key: meta block name
|
||||
// Value: block handle to that meta block
|
||||
BytewiseSortedMap meta_block_handles;
|
||||
|
||||
// Write filter block.
|
||||
if (r->filter_block != nullptr) {
|
||||
// Add mapping from "<filter_block_prefix>.Name" to location
|
||||
// of filter data.
|
||||
std::string key = BlockBasedTable::kFilterBlockPrefix;
|
||||
key.append(r->options.filter_policy->Name());
|
||||
std::string handle_encoding;
|
||||
filter_block_handle.EncodeTo(&handle_encoding);
|
||||
meta_block_handles.insert(
|
||||
std::make_pair(key, handle_encoding)
|
||||
);
|
||||
}
|
||||
|
||||
// Write stats block.
|
||||
{
|
||||
BlockBuilder stats_block(
|
||||
r->options.block_restart_interval,
|
||||
BytewiseComparator()
|
||||
);
|
||||
|
||||
BytewiseSortedMap stats;
|
||||
|
||||
// Add basic stats
|
||||
AddStats(stats, TableStatsNames::kRawKeySize, r->raw_key_size);
|
||||
AddStats(stats, TableStatsNames::kRawValueSize, r->raw_value_size);
|
||||
AddStats(stats, TableStatsNames::kDataSize, r->data_size);
|
||||
AddStats(
|
||||
stats,
|
||||
TableStatsNames::kIndexSize,
|
||||
r->index_block.CurrentSizeEstimate() + kBlockTrailerSize
|
||||
);
|
||||
AddStats(stats, TableStatsNames::kNumEntries, r->num_entries);
|
||||
AddStats(stats, TableStatsNames::kNumDataBlocks, r->num_data_blocks);
|
||||
if (r->filter_block != nullptr) {
|
||||
stats.insert(std::make_pair(
|
||||
TableStatsNames::kFilterPolicy,
|
||||
r->options.filter_policy->Name()
|
||||
));
|
||||
}
|
||||
|
||||
for (auto collector : r->options.table_stats_collectors) {
|
||||
TableStats::UserCollectedStats user_collected_stats;
|
||||
Status s =
|
||||
collector->Finish(&user_collected_stats);
|
||||
|
||||
if (!s.ok()) {
|
||||
LogStatsCollectionError(
|
||||
r->options.info_log.get(),
|
||||
"Finish", /* method */
|
||||
collector->Name()
|
||||
);
|
||||
} else {
|
||||
stats.insert(
|
||||
user_collected_stats.begin(),
|
||||
user_collected_stats.end()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto& stat : stats) {
|
||||
stats_block.Add(stat.first, stat.second);
|
||||
}
|
||||
|
||||
BlockHandle stats_block_handle;
|
||||
WriteBlock(&stats_block, &stats_block_handle);
|
||||
|
||||
std::string handle_encoding;
|
||||
stats_block_handle.EncodeTo(&handle_encoding);
|
||||
meta_block_handles.insert(
|
||||
std::make_pair(BlockBasedTable::kStatsBlock, handle_encoding)
|
||||
);
|
||||
} // end of stats block writing
|
||||
|
||||
for (const auto& metablock : meta_block_handles) {
|
||||
meta_index_block.Add(metablock.first, metablock.second);
|
||||
}
|
||||
|
||||
WriteBlock(&meta_index_block, &metaindex_block_handle);
|
||||
} // meta blocks and metaindex block.
|
||||
|
||||
// Write index block
|
||||
if (ok()) {
|
||||
WriteBlock(&r->index_block, &index_block_handle);
|
||||
}
|
||||
|
||||
// Write footer
|
||||
if (ok()) {
|
||||
Footer footer;
|
||||
footer.set_metaindex_handle(metaindex_block_handle);
|
||||
footer.set_index_handle(index_block_handle);
|
||||
std::string footer_encoding;
|
||||
footer.EncodeTo(&footer_encoding);
|
||||
r->status = r->file->Append(footer_encoding);
|
||||
if (r->status.ok()) {
|
||||
r->offset += footer_encoding.size();
|
||||
}
|
||||
}
|
||||
return r->status;
|
||||
}
|
||||
|
||||
void BlockBasedTableBuilder::Abandon() {
|
||||
Rep* r = rep_;
|
||||
assert(!r->closed);
|
||||
r->closed = true;
|
||||
}
|
||||
|
||||
uint64_t BlockBasedTableBuilder::NumEntries() const {
|
||||
return rep_->num_entries;
|
||||
}
|
||||
|
||||
uint64_t BlockBasedTableBuilder::FileSize() const {
|
||||
return rep_->offset;
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
Reference in New Issue
Block a user