diff --git a/db/db_impl.cc b/db/db_impl.cc index e6c1b56dd5..0909c56947 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3817,14 +3817,14 @@ Status DBImpl::GetDbIdentity(std::string& identity) { Status DB::Put(const WriteOptions& opt, const ColumnFamilyHandle& column_family, const Slice& key, const Slice& value) { WriteBatch batch; - batch.Put(column_family, key, value); + batch.Put(column_family.id, key, value); return Write(opt, &batch); } Status DB::Delete(const WriteOptions& opt, const ColumnFamilyHandle& column_family, const Slice& key) { WriteBatch batch; - batch.Delete(column_family, key); + batch.Delete(column_family.id, key); return Write(opt, &batch); } @@ -3832,7 +3832,7 @@ Status DB::Merge(const WriteOptions& opt, const ColumnFamilyHandle& column_family, const Slice& key, const Slice& value) { WriteBatch batch; - batch.Merge(column_family, key, value); + batch.Merge(column_family.id, key, value); return Write(opt, &batch); } diff --git a/db/db_iter.cc b/db/db_iter.cc index 596a9f6511..71bb2e57ce 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -226,6 +226,9 @@ void DBIter::FindNextUserEntry(bool skipping) { valid_ = true; MergeValuesNewToOld(); // Go to a different state machine return; + case kTypeColumnFamilyDeletion: + case kTypeColumnFamilyValue: + case kTypeColumnFamilyMerge: case kTypeLogData: assert(false); break; diff --git a/db/db_test.cc b/db/db_test.cc index 577655b368..3659e8d844 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -519,6 +519,9 @@ class DBTest { case kTypeDeletion: result += "DEL"; break; + case kTypeColumnFamilyDeletion: + case kTypeColumnFamilyValue: + case kTypeColumnFamilyMerge: case kTypeLogData: assert(false); break; diff --git a/db/dbformat.h b/db/dbformat.h index 64a2c9f052..82031cf5cc 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -29,7 +29,10 @@ enum ValueType { kTypeDeletion = 0x0, kTypeValue = 0x1, kTypeMerge = 0x2, - kTypeLogData = 0x3 + kTypeLogData = 0x3, + kTypeColumnFamilyDeletion = 0x4, + kTypeColumnFamilyValue = 0x5, + kTypeColumnFamilyMerge = 0x6, }; // kValueTypeForSeek defines the ValueType that should be passed when // constructing a ParsedInternalKey object for seeking to a particular diff --git a/db/memtable.cc b/db/memtable.cc index 675a314ff5..2dba364b0e 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -247,6 +247,9 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, } break; } + case kTypeColumnFamilyDeletion: + case kTypeColumnFamilyValue: + case kTypeColumnFamilyMerge: case kTypeLogData: assert(false); break; diff --git a/db/version_set.cc b/db/version_set.cc index fbb46404ac..f65851329e 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -379,6 +379,9 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){ } return true; + case kTypeColumnFamilyDeletion: + case kTypeColumnFamilyValue: + case kTypeColumnFamilyMerge: case kTypeLogData: assert(false); break; diff --git a/db/write_batch.cc b/db/write_batch.cc index 9d31905792..5a5d7e2788 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -15,6 +15,9 @@ // kTypeValue varstring varstring // kTypeMerge varstring varstring // kTypeDeletion varstring +// kTypeColumnFamilyValue varint32 varstring varstring +// kTypeColumnFamilyMerge varint32 varstring varstring +// kTypeColumnFamilyDeletion varint32 varstring varstring // varstring := // len: varint32 // data: uint8[len] @@ -87,28 +90,44 @@ Status WriteBatch::Iterate(Handler* handler) const { while (!input.empty() && handler->Continue()) { char tag = input[0]; input.remove_prefix(1); + uint32_t column_family = 0; // default switch (tag) { + case kTypeColumnFamilyValue: + if (!GetVarint32(&input, &column_family)) { + return Status::Corruption("bad WriteBatch Put"); + } + // intentional fallthrough case kTypeValue: if (GetLengthPrefixedSlice(&input, &key) && GetLengthPrefixedSlice(&input, &value)) { - handler->PutCF(default_column_family, key, value); + handler->PutCF(column_family, key, value); found++; } else { return Status::Corruption("bad WriteBatch Put"); } break; + case kTypeColumnFamilyDeletion: + if (!GetVarint32(&input, &column_family)) { + return Status::Corruption("bad WriteBatch Delete"); + } + // intentional fallthrough case kTypeDeletion: if (GetLengthPrefixedSlice(&input, &key)) { - handler->DeleteCF(default_column_family, key); + handler->DeleteCF(column_family, key); found++; } else { return Status::Corruption("bad WriteBatch Delete"); } break; + case kTypeColumnFamilyMerge: + if (!GetVarint32(&input, &column_family)) { + return Status::Corruption("bad WriteBatch Merge"); + } + // intentional fallthrough case kTypeMerge: if (GetLengthPrefixedSlice(&input, &key) && GetLengthPrefixedSlice(&input, &value)) { - handler->MergeCF(default_column_family, key, value); + handler->MergeCF(column_family, key, value); found++; } else { return Status::Corruption("bad WriteBatch Merge"); @@ -148,33 +167,53 @@ void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) { EncodeFixed64(&b->rep_[0], seq); } -void WriteBatch::Put(const ColumnFamilyHandle& column_family, const Slice& key, +void WriteBatch::Put(uint32_t column_family_id, const Slice& key, const Slice& value) { WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); - rep_.push_back(static_cast(kTypeValue)); + if (column_family_id == 0) { + // save some data on disk by not writing default column family + rep_.push_back(static_cast(kTypeValue)); + } else { + rep_.push_back(static_cast(kTypeColumnFamilyValue)); + PutVarint32(&rep_, column_family_id); + } PutLengthPrefixedSlice(&rep_, key); PutLengthPrefixedSlice(&rep_, value); } -void WriteBatch::Put(const ColumnFamilyHandle& column_family, - const SliceParts& key, const SliceParts& value) { +void WriteBatch::Put(uint32_t column_family_id, const SliceParts& key, + const SliceParts& value) { WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); - rep_.push_back(static_cast(kTypeValue)); + if (column_family_id == 0) { + rep_.push_back(static_cast(kTypeValue)); + } else { + rep_.push_back(static_cast(kTypeColumnFamilyValue)); + PutVarint32(&rep_, column_family_id); + } PutLengthPrefixedSliceParts(&rep_, key); PutLengthPrefixedSliceParts(&rep_, value); } -void WriteBatch::Delete(const ColumnFamilyHandle& column_family, - const Slice& key) { +void WriteBatch::Delete(uint32_t column_family_id, const Slice& key) { WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); - rep_.push_back(static_cast(kTypeDeletion)); + if (column_family_id == 0) { + rep_.push_back(static_cast(kTypeDeletion)); + } else { + rep_.push_back(static_cast(kTypeColumnFamilyDeletion)); + PutVarint32(&rep_, column_family_id); + } PutLengthPrefixedSlice(&rep_, key); } -void WriteBatch::Merge(const ColumnFamilyHandle& column_family, - const Slice& key, const Slice& value) { +void WriteBatch::Merge(uint32_t column_family_id, const Slice& key, + const Slice& value) { WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); - rep_.push_back(static_cast(kTypeMerge)); + if (column_family_id == 0) { + rep_.push_back(static_cast(kTypeMerge)); + } else { + rep_.push_back(static_cast(kTypeColumnFamilyMerge)); + PutVarint32(&rep_, column_family_id); + } PutLengthPrefixedSlice(&rep_, key); PutLengthPrefixedSlice(&rep_, value); } @@ -207,7 +246,7 @@ class MemTableInserter : public WriteBatch::Handler { } } - virtual void PutCF(const ColumnFamilyHandle& column_family, const Slice& key, + virtual void PutCF(uint32_t column_family_id, const Slice& key, const Slice& value) { if (options_->inplace_update_support && mem_->Update(sequence_, kTypeValue, key, value)) { @@ -217,13 +256,12 @@ class MemTableInserter : public WriteBatch::Handler { } sequence_++; } - virtual void MergeCF(const ColumnFamilyHandle& column_family, - const Slice& key, const Slice& value) { + virtual void MergeCF(uint32_t column_family_id, const Slice& key, + const Slice& value) { mem_->Add(sequence_, kTypeMerge, key, value); sequence_++; } - virtual void DeleteCF(const ColumnFamilyHandle& column_family, - const Slice& key) { + virtual void DeleteCF(uint32_t column_family_id, const Slice& key) { if (filter_deletes_) { SnapshotImpl read_from_snapshot; read_from_snapshot.number_ = sequence_; diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index ff9aa63eec..490a4401ff 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -56,6 +56,9 @@ static std::string PrintContents(WriteBatch* b) { state.append(")"); count++; break; + case kTypeColumnFamilyDeletion: + case kTypeColumnFamilyValue: + case kTypeColumnFamilyMerge: case kTypeLogData: assert(false); break; @@ -143,17 +146,34 @@ TEST(WriteBatchTest, Append) { namespace { struct TestHandler : public WriteBatch::Handler { std::string seen; - virtual void Put(const Slice& key, const Slice& value) { - seen += "Put(" + key.ToString() + ", " + value.ToString() + ")"; + virtual void PutCF(uint32_t column_family_id, const Slice& key, + const Slice& value) { + if (column_family_id == 0) { + seen += "Put(" + key.ToString() + ", " + value.ToString() + ")"; + } else { + seen += "PutCF(" + std::to_string(column_family_id) + ", " + + key.ToString() + ", " + value.ToString() + ")"; + } } - virtual void Merge(const Slice& key, const Slice& value) { - seen += "Merge(" + key.ToString() + ", " + value.ToString() + ")"; + virtual void MergeCF(uint32_t column_family_id, const Slice& key, + const Slice& value) { + if (column_family_id == 0) { + seen += "Merge(" + key.ToString() + ", " + value.ToString() + ")"; + } else { + seen += "MergeCF(" + std::to_string(column_family_id) + ", " + + key.ToString() + ", " + value.ToString() + ")"; + } } virtual void LogData(const Slice& blob) { seen += "LogData(" + blob.ToString() + ")"; } - virtual void Delete(const Slice& key) { - seen += "Delete(" + key.ToString() + ")"; + virtual void DeleteCF(uint32_t column_family_id, const Slice& key) { + if (column_family_id == 0) { + seen += "Delete(" + key.ToString() + ")"; + } else { + seen += "DeleteCF(" + std::to_string(column_family_id) + ", " + + key.ToString() + ")"; + } } }; } @@ -193,21 +213,23 @@ TEST(WriteBatchTest, Continue) { struct Handler : public TestHandler { int num_seen = 0; - virtual void Put(const Slice& key, const Slice& value) { + virtual void PutCF(uint32_t column_family_id, const Slice& key, + const Slice& value) { ++num_seen; - TestHandler::Put(key, value); + TestHandler::PutCF(column_family_id, key, value); } - virtual void Merge(const Slice& key, const Slice& value) { + virtual void MergeCF(uint32_t column_family_id, const Slice& key, + const Slice& value) { ++num_seen; - TestHandler::Merge(key, value); + TestHandler::MergeCF(column_family_id, key, value); } virtual void LogData(const Slice& blob) { ++num_seen; TestHandler::LogData(blob); } - virtual void Delete(const Slice& key) { + virtual void DeleteCF(uint32_t column_family_id, const Slice& key) { ++num_seen; - TestHandler::Delete(key); + TestHandler::DeleteCF(column_family_id, key); } virtual bool Continue() override { return num_seen < 3; @@ -255,6 +277,29 @@ TEST(WriteBatchTest, PutGatherSlices) { ASSERT_EQ(3, batch.Count()); } +TEST(WriteBatchTest, ColumnFamiliesBatchTest) { + WriteBatch batch; + batch.Put(0, Slice("foo"), Slice("bar")); + batch.Put(2, Slice("twofoo"), Slice("bar2")); + batch.Put(8, Slice("eightfoo"), Slice("bar8")); + batch.Delete(8, Slice("eightfoo")); + batch.Merge(3, Slice("threethree"), Slice("3three")); + batch.Put(0, Slice("foo"), Slice("bar")); + batch.Merge(Slice("omom"), Slice("nom")); + + TestHandler handler; + batch.Iterate(&handler); + ASSERT_EQ( + "Put(foo, bar)" + "PutCF(2, twofoo, bar2)" + "PutCF(8, eightfoo, bar8)" + "DeleteCF(8, eightfoo)" + "MergeCF(3, threethree, 3three)" + "Put(foo, bar)" + "Merge(omom, nom)", + handler.seen); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 9e92f21c5a..bc1d63ce4c 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -40,33 +40,31 @@ class WriteBatch { ~WriteBatch(); // Store the mapping "key->value" in the database. - void Put(const ColumnFamilyHandle& column_family, const Slice& key, - const Slice& value); + void Put(uint32_t column_family_id, const Slice& key, const Slice& value); void Put(const Slice& key, const Slice& value) { - Put(default_column_family, key, value); + Put(0, key, value); } // Variant of Put() that gathers output like writev(2). The key and value // that will be written to the database are concatentations of arrays of // slices. - void Put(const ColumnFamilyHandle& column_family, const SliceParts& key, + void Put(uint32_t column_family_id, const SliceParts& key, const SliceParts& value); void Put(const SliceParts& key, const SliceParts& value) { - Put(default_column_family, key, value); + Put(0, key, value); } // Merge "value" with the existing value of "key" in the database. // "key->merge(existing, value)" - void Merge(const ColumnFamilyHandle& column_family, const Slice& key, - const Slice& value); + void Merge(uint32_t column_family_id, const Slice& key, const Slice& value); void Merge(const Slice& key, const Slice& value) { - Merge(default_column_family, key, value); + Merge(0, key, value); } // If the database contains a mapping for "key", erase it. Else do nothing. - void Delete(const ColumnFamilyHandle& column_family, const Slice& key); + void Delete(uint32_t column_family_id, const Slice& key); void Delete(const Slice& key) { - Delete(default_column_family, key); + Delete(0, key); } // Append a blob of arbitrary size to the records in this batch. The blob will @@ -89,25 +87,31 @@ class WriteBatch { public: virtual ~Handler(); // default implementation will just call Put without column family for - // backwards compatibility - virtual void PutCF(const ColumnFamilyHandle& column_family, - const Slice& key, const Slice& value) { - Put(key, value); + // backwards compatibility. If the column family is not default, + // the function is noop + virtual void PutCF(uint32_t column_family_id, const Slice& key, + const Slice& value) { + if (column_family_id == 0) { + Put(key, value); + } } virtual void Put(const Slice& key, const Slice& value); // Merge and LogData are not pure virtual. Otherwise, we would break // existing clients of Handler on a source code level. The default // implementation of Merge simply throws a runtime exception. - virtual void MergeCF(const ColumnFamilyHandle& column_family, - const Slice& key, const Slice& value) { - Merge(key, value); + virtual void MergeCF(uint32_t column_family_id, const Slice& key, + const Slice& value) { + if (column_family_id == 0) { + Merge(key, value); + } } virtual void Merge(const Slice& key, const Slice& value); // The default implementation of LogData does nothing. virtual void LogData(const Slice& blob); - virtual void DeleteCF(const ColumnFamilyHandle& column_family, - const Slice& key) { - Delete(key); + virtual void DeleteCF(uint32_t column_family_id, const Slice& key) { + if (column_family_id == 0) { + Delete(key); + } } virtual void Delete(const Slice& key); // Continue is called by WriteBatch::Iterate. If it returns false,