Add column family information to WAL

Summary:
I have added three new value types:
* kTypeColumnFamilyDeletion
* kTypeColumnFamilyValue
* kTypeColumnFamilyMerge
which include column family Varint32 before the data (value, deletion and merge). These values are used only in WAL (not in memtables yet).

This endeavour required changing some WriteBatch internals.

Test Plan: Added a unittest

Reviewers: dhruba, haobo, sdong, kailiu

CC: leveldb

Differential Revision: https://reviews.facebook.net/D15045
This commit is contained in:
Igor Canadi
2014-01-07 14:41:42 -08:00
parent 72918efffe
commit 19e3ee64ac
9 changed files with 157 additions and 55 deletions

View File

@@ -15,6 +15,9 @@
// kTypeValue varstring varstring
// kTypeMerge varstring varstring
// kTypeDeletion varstring
// kTypeColumnFamilyValue varint32 varstring varstring
// kTypeColumnFamilyMerge varint32 varstring varstring
// kTypeColumnFamilyDeletion varint32 varstring varstring
// varstring :=
// len: varint32
// data: uint8[len]
@@ -87,28 +90,44 @@ Status WriteBatch::Iterate(Handler* handler) const {
while (!input.empty() && handler->Continue()) {
char tag = input[0];
input.remove_prefix(1);
uint32_t column_family = 0; // default
switch (tag) {
case kTypeColumnFamilyValue:
if (!GetVarint32(&input, &column_family)) {
return Status::Corruption("bad WriteBatch Put");
}
// intentional fallthrough
case kTypeValue:
if (GetLengthPrefixedSlice(&input, &key) &&
GetLengthPrefixedSlice(&input, &value)) {
handler->PutCF(default_column_family, key, value);
handler->PutCF(column_family, key, value);
found++;
} else {
return Status::Corruption("bad WriteBatch Put");
}
break;
case kTypeColumnFamilyDeletion:
if (!GetVarint32(&input, &column_family)) {
return Status::Corruption("bad WriteBatch Delete");
}
// intentional fallthrough
case kTypeDeletion:
if (GetLengthPrefixedSlice(&input, &key)) {
handler->DeleteCF(default_column_family, key);
handler->DeleteCF(column_family, key);
found++;
} else {
return Status::Corruption("bad WriteBatch Delete");
}
break;
case kTypeColumnFamilyMerge:
if (!GetVarint32(&input, &column_family)) {
return Status::Corruption("bad WriteBatch Merge");
}
// intentional fallthrough
case kTypeMerge:
if (GetLengthPrefixedSlice(&input, &key) &&
GetLengthPrefixedSlice(&input, &value)) {
handler->MergeCF(default_column_family, key, value);
handler->MergeCF(column_family, key, value);
found++;
} else {
return Status::Corruption("bad WriteBatch Merge");
@@ -148,33 +167,53 @@ void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
EncodeFixed64(&b->rep_[0], seq);
}
void WriteBatch::Put(const ColumnFamilyHandle& column_family, const Slice& key,
void WriteBatch::Put(uint32_t column_family_id, const Slice& key,
const Slice& value) {
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeValue));
if (column_family_id == 0) {
// save some data on disk by not writing default column family
rep_.push_back(static_cast<char>(kTypeValue));
} else {
rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
PutVarint32(&rep_, column_family_id);
}
PutLengthPrefixedSlice(&rep_, key);
PutLengthPrefixedSlice(&rep_, value);
}
void WriteBatch::Put(const ColumnFamilyHandle& column_family,
const SliceParts& key, const SliceParts& value) {
void WriteBatch::Put(uint32_t column_family_id, const SliceParts& key,
const SliceParts& value) {
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeValue));
if (column_family_id == 0) {
rep_.push_back(static_cast<char>(kTypeValue));
} else {
rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
PutVarint32(&rep_, column_family_id);
}
PutLengthPrefixedSliceParts(&rep_, key);
PutLengthPrefixedSliceParts(&rep_, value);
}
void WriteBatch::Delete(const ColumnFamilyHandle& column_family,
const Slice& key) {
void WriteBatch::Delete(uint32_t column_family_id, const Slice& key) {
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeDeletion));
if (column_family_id == 0) {
rep_.push_back(static_cast<char>(kTypeDeletion));
} else {
rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
PutVarint32(&rep_, column_family_id);
}
PutLengthPrefixedSlice(&rep_, key);
}
void WriteBatch::Merge(const ColumnFamilyHandle& column_family,
const Slice& key, const Slice& value) {
void WriteBatch::Merge(uint32_t column_family_id, const Slice& key,
const Slice& value) {
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeMerge));
if (column_family_id == 0) {
rep_.push_back(static_cast<char>(kTypeMerge));
} else {
rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
PutVarint32(&rep_, column_family_id);
}
PutLengthPrefixedSlice(&rep_, key);
PutLengthPrefixedSlice(&rep_, value);
}
@@ -207,7 +246,7 @@ class MemTableInserter : public WriteBatch::Handler {
}
}
virtual void PutCF(const ColumnFamilyHandle& column_family, const Slice& key,
virtual void PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
if (options_->inplace_update_support
&& mem_->Update(sequence_, kTypeValue, key, value)) {
@@ -217,13 +256,12 @@ class MemTableInserter : public WriteBatch::Handler {
}
sequence_++;
}
virtual void MergeCF(const ColumnFamilyHandle& column_family,
const Slice& key, const Slice& value) {
virtual void MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
mem_->Add(sequence_, kTypeMerge, key, value);
sequence_++;
}
virtual void DeleteCF(const ColumnFamilyHandle& column_family,
const Slice& key) {
virtual void DeleteCF(uint32_t column_family_id, const Slice& key) {
if (filter_deletes_) {
SnapshotImpl read_from_snapshot;
read_from_snapshot.number_ = sequence_;