[RocksDB] [Column Family] Interface proposal

Summary:
<This diff is for Column Family branch>

Sharing some of the work I've done so far. This diff compiles and passes the tests.

The biggest change is in options.h - I broke down Options into two parts - DBOptions and ColumnFamilyOptions. DBOptions is DB-specific (env, create_if_missing, block_cache, etc.) and ColumnFamilyOptions is column family-specific (all compaction options, compresion options, etc.). Note that this does not break backwards compatibility at all.

Further, I created DBWithColumnFamily which inherits DB interface and adds new functions with column family support. Clients can transparently switch to DBWithColumnFamily and it will not break their backwards compatibility.
There are few methods worth checking out: ListColumnFamilies(), MultiNewIterator(), MultiGet() and GetSnapshot(). [GetSnapshot() returns the snapshot across all column families for now - I think that's what we agreed on]

Finally, I made small changes to WriteBatch so we are able to atomically insert data across column families.

Please provide feedback.

Test Plan: make check works, the code is backward compatible

Reviewers: dhruba, haobo, sdong, kailiu, emayanke

CC: leveldb

Differential Revision: https://reviews.facebook.net/D14445
This commit is contained in:
Igor Canadi
2013-12-03 11:14:09 -08:00
parent 14995a8ff3
commit 9385a5247e
15 changed files with 827 additions and 437 deletions

View File

@@ -38,6 +38,7 @@
#include "port/port.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/db.h"
#include "rocksdb/column_family.h"
#include "rocksdb/env.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/statistics.h"
@@ -59,6 +60,8 @@
namespace rocksdb {
const Slice& default_column_family_name("default");
void dumpLeveldbBuildVersion(Logger * log);
// Information kept for every waiting writer
@@ -1205,7 +1208,8 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
return s;
}
void DBImpl::CompactRange(const Slice* begin, const Slice* end,
void DBImpl::CompactRange(const ColumnFamilyHandle& column_family,
const Slice* begin, const Slice* end,
bool reduce_level, int target_level) {
int max_level_with_files = 1;
{
@@ -1300,19 +1304,20 @@ void DBImpl::ReFitLevel(int level, int target_level) {
bg_work_gate_closed_ = false;
}
int DBImpl::NumberLevels() {
int DBImpl::NumberLevels(const ColumnFamilyHandle& column_family) {
return options_.num_levels;
}
int DBImpl::MaxMemCompactionLevel() {
int DBImpl::MaxMemCompactionLevel(const ColumnFamilyHandle& column_family) {
return options_.max_mem_compaction_level;
}
int DBImpl::Level0StopWriteTrigger() {
int DBImpl::Level0StopWriteTrigger(const ColumnFamilyHandle& column_family) {
return options_.level0_stop_writes_trigger;
}
Status DBImpl::Flush(const FlushOptions& options) {
Status DBImpl::Flush(const FlushOptions& options,
const ColumnFamilyHandle& column_family) {
Status status = FlushMemTable(options);
return status;
}
@@ -2583,7 +2588,7 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
}
Status DBImpl::Get(const ReadOptions& options,
const Slice& key,
const ColumnFamilyHandle& column_family, const Slice& key,
std::string* value) {
return GetImpl(options, key, value);
}
@@ -2657,9 +2662,10 @@ Status DBImpl::GetImpl(const ReadOptions& options,
return s;
}
std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
const std::vector<Slice>& keys,
std::vector<std::string>* values) {
std::vector<Status> DBImpl::MultiGet(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle>& column_family,
const std::vector<Slice>& keys, std::vector<std::string>* values) {
StopWatch sw(env_, options_.statistics.get(), DB_MULTIGET);
SequenceNumber snapshot;
@@ -2743,8 +2749,8 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
}
bool DBImpl::KeyMayExist(const ReadOptions& options,
const Slice& key,
std::string* value,
const ColumnFamilyHandle& column_family,
const Slice& key, std::string* value,
bool* value_found) {
if (value_found != nullptr) {
// falsify later if key-may-exist but can't fetch value
@@ -2760,7 +2766,8 @@ bool DBImpl::KeyMayExist(const ReadOptions& options,
return s.ok() || s.IsIncomplete();
}
Iterator* DBImpl::NewIterator(const ReadOptions& options) {
Iterator* DBImpl::NewIterator(const ReadOptions& options,
const ColumnFamilyHandle& column_family) {
SequenceNumber latest_snapshot;
Iterator* iter = NewInternalIterator(options, &latest_snapshot);
iter = NewDBIterator(
@@ -2777,6 +2784,14 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options) {
return iter;
}
Status DBImpl::NewIterators(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle>& column_family,
std::vector<Iterator*>* iterators) {
// TODO
return Status::NotSupported("Not yet!");
}
const Snapshot* DBImpl::GetSnapshot() {
MutexLock l(&mutex_);
return snapshots_.New(versions_->LastSequence());
@@ -2788,21 +2803,26 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) {
}
// Convenience methods
Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
return DB::Put(o, key, val);
Status DBImpl::Put(const WriteOptions& o,
const ColumnFamilyHandle& column_family, const Slice& key,
const Slice& val) {
return DB::Put(o, column_family, key, val);
}
Status DBImpl::Merge(const WriteOptions& o, const Slice& key,
Status DBImpl::Merge(const WriteOptions& o,
const ColumnFamilyHandle& column_family, const Slice& key,
const Slice& val) {
if (!options_.merge_operator) {
return Status::NotSupported("Provide a merge_operator when opening DB");
} else {
return DB::Merge(o, key, val);
return DB::Merge(o, column_family, key, val);
}
}
Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
return DB::Delete(options, key);
Status DBImpl::Delete(const WriteOptions& options,
const ColumnFamilyHandle& column_family,
const Slice& key) {
return DB::Delete(options, column_family, key);
}
Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
@@ -3199,11 +3219,13 @@ Env* DBImpl::GetEnv() const {
return env_;
}
const Options& DBImpl::GetOptions() const {
const Options& DBImpl::GetOptions(const ColumnFamilyHandle& column_family)
const {
return options_;
}
bool DBImpl::GetProperty(const Slice& property, std::string* value) {
bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family,
const Slice& property, std::string* value) {
value->clear();
MutexLock l(&mutex_);
@@ -3480,9 +3502,8 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
return false;
}
void DBImpl::GetApproximateSizes(
const Range* range, int n,
uint64_t* sizes) {
void DBImpl::GetApproximateSizes(const ColumnFamilyHandle& column_family,
const Range* range, int n, uint64_t* sizes) {
// TODO(opt): better implementation
Version* v;
{
@@ -3616,25 +3637,38 @@ Status DBImpl::GetDbIdentity(std::string& identity) {
// Default implementations of convenience methods that subclasses of DB
// can call if they wish
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
Status DB::Put(const WriteOptions& opt, const ColumnFamilyHandle& column_family,
const Slice& key, const Slice& value) {
WriteBatch batch;
batch.Put(key, value);
batch.Put(column_family, key, value);
return Write(opt, &batch);
}
Status DB::Delete(const WriteOptions& opt, const Slice& key) {
Status DB::Delete(const WriteOptions& opt,
const ColumnFamilyHandle& column_family, const Slice& key) {
WriteBatch batch;
batch.Delete(key);
batch.Delete(column_family, key);
return Write(opt, &batch);
}
Status DB::Merge(const WriteOptions& opt, const Slice& key,
Status DB::Merge(const WriteOptions& opt,
const ColumnFamilyHandle& column_family, const Slice& key,
const Slice& value) {
WriteBatch batch;
batch.Merge(key, value);
batch.Merge(column_family, key, value);
return Write(opt, &batch);
}
Status DB::OpenColumnFamily(const ColumnFamilyOptions& options,
const Slice& column_family,
ColumnFamilyHandle* handle) {
return Status::NotSupported("working on it");
}
Status DB::DropColumnFamily(const ColumnFamilyHandle& column_family) {
return Status::NotSupported("working on it");
}
DB::~DB() { }
Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
@@ -3706,6 +3740,21 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
return s;
}
Status DB::OpenWithColumnFamilies(
const DBOptions& db_options, const std::string& name,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle>* handles, DB** dbptr) {
// TODO
return Status::NotSupported("Working on it");
}
Status DB::ListColumnFamilies(
const DBOptions& db_options, const std::string& name,
const std::vector<std::string>* column_families) {
// TODO
return Status::NotSupported("Working on it");
}
Snapshot::~Snapshot() {
}