From e1346968d8b2b32398e31b42bd09fcd2da56a868 Mon Sep 17 00:00:00 2001 From: Deon Nicholas Date: Mon, 19 Aug 2013 11:42:47 -0700 Subject: [PATCH] Merge operator fixes part 1. Summary: -Added null checks and revisions to DBIter::MergeValuesNewToOld() -Added DBIter test to stringappend_test -Major fix with Merge and TTL More plans for fixes later. Test Plan: -make clean; make stringappend_test -j 32; ./stringappend_test -make all check; Reviewers: haobo, emayanke, vamsi, dhruba Reviewed By: haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D12315 --- Makefile | 2 +- db/db_iter.cc | 19 ++-- db/memtable.cc | 8 +- db/merge_helper.cc | 18 ++-- db/merge_operator.cc | 2 +- db/version_set.cc | 12 +-- include/leveldb/merge_operator.h | 31 +++--- include/utilities/stackable_db.h | 2 +- utilities/merge_operators/put.cc | 10 +- .../string_append/stringappend.h | 1 - .../string_append/stringappend2.cc | 11 ++- .../string_append/stringappend2.h | 10 +- .../string_append/stringappend_test.cc | 95 +++++++++++++++++++ utilities/ttl/db_ttl.cc | 2 +- utilities/ttl/db_ttl.h | 34 ++++--- 15 files changed, 181 insertions(+), 76 deletions(-) diff --git a/Makefile b/Makefile index d16956dc51..196b88c4cd 100644 --- a/Makefile +++ b/Makefile @@ -20,7 +20,7 @@ include build_config.mk WARNING_FLAGS = -Wall -Werror CFLAGS += -g $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CCFLAGS) $(OPT) -CXXFLAGS += -g $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT) -std=gnu++0x +CXXFLAGS += -g $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT) -std=gnu++0x -Woverloaded-virtual LDFLAGS += $(PLATFORM_LDFLAGS) diff --git a/db/db_iter.cc b/db/db_iter.cc index 1d8c84427f..5051b35539 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -212,11 +212,8 @@ void DBIter::FindNextUserEntry(bool skipping) { SaveKey(ikey.user_key, &saved_key_); current_entry_is_merged_ = true; valid_ = true; - // Go to a different state machine - MergeValuesNewToOld(); - // TODO: what if !iter_->Valid() + MergeValuesNewToOld(); // Go to a different state machine return; - break; case kTypeLogData: assert(false); break; @@ -235,7 +232,11 @@ void DBIter::FindNextUserEntry(bool skipping) { // POST: saved_value_ has the merged value for the user key // iter_ points to the next entry (or invalid) void DBIter::MergeValuesNewToOld() { - // TODO: Is there a way to unite with MergeHelper or other similar code? + if (!user_merge_operator_) { + Log(logger_, "Options::merge_operator is null."); + throw std::logic_error("DBIter::MergeValuesNewToOld() with" + " Options::merge_operator null"); + } // Start the merge process by pushing the first operand std::deque operands; @@ -266,8 +267,8 @@ void DBIter::MergeValuesNewToOld() { // final result in saved_value_. We are done! // ignore corruption if there is any. const Slice value = iter_->value(); - user_merge_operator_->Merge(ikey.user_key, &value, operands, - &saved_value_, logger_.get()); + user_merge_operator_->FullMerge(ikey.user_key, &value, operands, + &saved_value_, logger_.get()); // iter_ is positioned after put iter_->Next(); return; @@ -300,8 +301,8 @@ void DBIter::MergeValuesNewToOld() { // a deletion marker. // feed null as the existing value to the merge operator, such that // client can differentiate this scenario and do things accordingly. - user_merge_operator_->Merge(ikey.user_key, nullptr, operands, - &saved_value_, logger_.get()); + user_merge_operator_->FullMerge(saved_key_, nullptr, operands, + &saved_value_, logger_.get()); } void DBIter::Prev() { diff --git a/db/memtable.cc b/db/memtable.cc index 24d6bd8dd2..37ab558521 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -167,8 +167,8 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, *s = Status::OK(); if (merge_in_progress) { assert(merge_operator); - if (!merge_operator->Merge(key.user_key(), &v, *operands, - value, logger.get())) { + if (!merge_operator->FullMerge(key.user_key(), &v, *operands, + value, logger.get())) { RecordTick(options.statistics, NUMBER_MERGE_FAILURES); *s = Status::Corruption("Error: Could not perform merge."); } @@ -181,8 +181,8 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, if (merge_in_progress) { assert(merge_operator); *s = Status::OK(); - if (!merge_operator->Merge(key.user_key(), nullptr, *operands, - value, logger.get())) { + if (!merge_operator->FullMerge(key.user_key(), nullptr, *operands, + value, logger.get())) { RecordTick(options.statistics, NUMBER_MERGE_FAILURES); *s = Status::Corruption("Error: Could not perform merge."); } diff --git a/db/merge_helper.cc b/db/merge_helper.cc index d1f3c6683d..32d4874cbe 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -67,9 +67,9 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, // => store result in operands_.back() (and update keys_.back()) // => change the entry type to kTypeValue for keys_.back() // We are done! Return a success if the merge passes. - success_ = user_merge_operator_->Merge(ikey.user_key, nullptr, - operands_, &merge_result, - logger_); + success_ = user_merge_operator_->FullMerge(ikey.user_key, nullptr, + operands_, &merge_result, + logger_); // We store the result in keys_.back() and operands_.back() // if nothing went wrong (i.e.: no operand corruption on disk) @@ -95,9 +95,9 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, // => change the entry type to kTypeValue for keys_.back() // We are done! Success! const Slice value = iter->value(); - success_ = user_merge_operator_->Merge(ikey.user_key, &value, - operands_, &merge_result, - logger_); + success_ = user_merge_operator_->FullMerge(ikey.user_key, &value, + operands_, &merge_result, + logger_); // We store the result in keys_.back() and operands_.back() // if nothing went wrong (i.e.: no operand corruption on disk) @@ -170,9 +170,9 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, assert(kTypeMerge == orig_ikey.type); assert(operands_.size() >= 1); assert(operands_.size() == keys_.size()); - success_ = user_merge_operator_->Merge(ikey.user_key, nullptr, - operands_, &merge_result, - logger_); + success_ = user_merge_operator_->FullMerge(ikey.user_key, nullptr, + operands_, &merge_result, + logger_); if (success_) { std::string& key = keys_.back(); // The original key encountered diff --git a/db/merge_operator.cc b/db/merge_operator.cc index 89c64b5d99..aee99128a5 100644 --- a/db/merge_operator.cc +++ b/db/merge_operator.cc @@ -12,7 +12,7 @@ namespace leveldb { // Given a "real" merge from the library, call the user's // associative merge function one-by-one on each of the operands. // NOTE: It is assumed that the client's merge-operator will handle any errors. -bool AssociativeMergeOperator::Merge( +bool AssociativeMergeOperator::FullMerge( const Slice& key, const Slice* existing_value, const std::deque& operand_list, diff --git a/db/version_set.cc b/db/version_set.cc index 63a58f6713..609965224e 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -286,8 +286,8 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){ } else if (kMerge == s->state) { assert(s->merge_operator != nullptr); s->state = kFound; - if (!s->merge_operator->Merge(s->user_key, &v, *ops, - s->value, s->logger)) { + if (!s->merge_operator->FullMerge(s->user_key, &v, *ops, + s->value, s->logger)) { RecordTick(s->statistics, NUMBER_MERGE_FAILURES); s->state = kCorrupt; } @@ -301,8 +301,8 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){ s->state = kDeleted; } else if (kMerge == s->state) { s->state = kFound; - if (!s->merge_operator->Merge(s->user_key, nullptr, *ops, - s->value, s->logger)) { + if (!s->merge_operator->FullMerge(s->user_key, nullptr, *ops, + s->value, s->logger)) { RecordTick(s->statistics, NUMBER_MERGE_FAILURES); s->state = kCorrupt; } @@ -521,8 +521,8 @@ void Version::Get(const ReadOptions& options, if (kMerge == saver.state) { // merge_operands are in saver and we hit the beginning of the key history // do a final merge of nullptr and operands; - if (merge_operator->Merge(user_key, nullptr, *saver.merge_operands, - value, logger.get())) { + if (merge_operator->FullMerge(user_key, nullptr, *saver.merge_operands, + value, logger.get())) { *status = Status::OK(); } else { RecordTick(db_options.statistics, NUMBER_MERGE_FAILURES); diff --git a/include/leveldb/merge_operator.h b/include/leveldb/merge_operator.h index 44c4db66ba..e64418aee5 100644 --- a/include/leveldb/merge_operator.h +++ b/include/leveldb/merge_operator.h @@ -29,10 +29,11 @@ class Logger; // into rocksdb); numeric addition and string concatenation are examples; // // b) MergeOperator - the generic class for all the more abstract / complex -// operations; one method to merge a Put/Delete value with a merge operand; -// and another method (PartialMerge) that merges two operands together. -// this is especially useful if your key values have a complex structure, -// but you would still like to support client-specific incremental updates. +// operations; one method (FullMerge) to merge a Put/Delete value with a +// merge operand; and another method (PartialMerge) that merges two +// operands together. this is especially useful if your key values have a +// complex structure but you would still like to support client-specific +// incremental updates. // // AssociativeMergeOperator is simpler to implement. MergeOperator is simply // more powerful. @@ -60,11 +61,11 @@ class MergeOperator { // internal corruption. This will be treated as an error by the library. // // Also make use of the *logger for error messages. - virtual bool Merge(const Slice& key, - const Slice* existing_value, - const std::deque& operand_list, - std::string* new_value, - Logger* logger) const = 0; + virtual bool FullMerge(const Slice& key, + const Slice* existing_value, + const std::deque& operand_list, + std::string* new_value, + Logger* logger) const = 0; // This function performs merge(left_op, right_op) // when both the operands are themselves merge operation types @@ -85,7 +86,7 @@ class MergeOperator { // TODO: Presently there is no way to differentiate between error/corruption // and simply "return false". For now, the client should simply return // false in any case it cannot perform partial-merge, regardless of reason. - // If there is corruption in the data, handle it in the above Merge() function, + // If there is corruption in the data, handle it in the FullMerge() function, // and return false there. virtual bool PartialMerge(const Slice& key, const Slice& left_operand, @@ -128,11 +129,11 @@ class AssociativeMergeOperator : public MergeOperator { private: // Default implementations of the MergeOperator functions - virtual bool Merge(const Slice& key, - const Slice* existing_value, - const std::deque& operand_list, - std::string* new_value, - Logger* logger) const override; + virtual bool FullMerge(const Slice& key, + const Slice* existing_value, + const std::deque& operand_list, + std::string* new_value, + Logger* logger) const override; virtual bool PartialMerge(const Slice& key, const Slice& left_operand, diff --git a/include/utilities/stackable_db.h b/include/utilities/stackable_db.h index 28d5a7701b..916496e049 100644 --- a/include/utilities/stackable_db.h +++ b/include/utilities/stackable_db.h @@ -60,7 +60,7 @@ class StackableDB : public DB { const Slice& key, std::string* value, bool* value_found = nullptr) override { - return KeyMayExist(options, key, value, value_found); + return sdb_->KeyMayExist(options, key, value, value_found); } virtual Status Delete(const WriteOptions& wopts, const Slice& key) override { diff --git a/utilities/merge_operators/put.cc b/utilities/merge_operators/put.cc index 123dce66a6..39da05a7e1 100644 --- a/utilities/merge_operators/put.cc +++ b/utilities/merge_operators/put.cc @@ -17,11 +17,11 @@ namespace { // anonymous namespace // From the client-perspective, semantics are the same. class PutOperator : public MergeOperator { public: - virtual bool Merge(const Slice& key, - const Slice* existing_value, - const std::deque& operand_sequence, - std::string* new_value, - Logger* logger) const override { + virtual bool FullMerge(const Slice& key, + const Slice* existing_value, + const std::deque& operand_sequence, + std::string* new_value, + Logger* logger) const override { // Put basically only looks at the current/latest value assert(!operand_sequence.empty()); assert(new_value != nullptr); diff --git a/utilities/merge_operators/string_append/stringappend.h b/utilities/merge_operators/string_append/stringappend.h index 34cca69449..3787773a80 100644 --- a/utilities/merge_operators/string_append/stringappend.h +++ b/utilities/merge_operators/string_append/stringappend.h @@ -11,7 +11,6 @@ namespace leveldb { class StringAppendOperator : public AssociativeMergeOperator { public: - StringAppendOperator(char delim_char); /// Constructor: specify delimiter virtual bool Merge(const Slice& key, diff --git a/utilities/merge_operators/string_append/stringappend2.cc b/utilities/merge_operators/string_append/stringappend2.cc index 93e65a7b81..c409a1105d 100644 --- a/utilities/merge_operators/string_append/stringappend2.cc +++ b/utilities/merge_operators/string_append/stringappend2.cc @@ -20,11 +20,12 @@ StringAppendTESTOperator::StringAppendTESTOperator(char delim_char) } // Implementation for the merge operation (concatenates two strings) -bool StringAppendTESTOperator::Merge(const Slice& key, - const Slice* existing_value, - const std::deque& operands, - std::string* new_value, - Logger* logger) const { +bool StringAppendTESTOperator::FullMerge( + const Slice& key, + const Slice* existing_value, + const std::deque& operands, + std::string* new_value, + Logger* logger) const { // Clear the *new_value for writing. assert(new_value); diff --git a/utilities/merge_operators/string_append/stringappend2.h b/utilities/merge_operators/string_append/stringappend2.h index 2e9f505bd1..a8f090ffd3 100644 --- a/utilities/merge_operators/string_append/stringappend2.h +++ b/utilities/merge_operators/string_append/stringappend2.h @@ -20,11 +20,11 @@ class StringAppendTESTOperator : public MergeOperator { StringAppendTESTOperator(char delim_char); /// Constructor with delimiter - virtual bool Merge(const Slice& key, - const Slice* existing_value, - const std::deque& operand_sequence, - std::string* new_value, - Logger* logger) const override; + virtual bool FullMerge(const Slice& key, + const Slice* existing_value, + const std::deque& operand_sequence, + std::string* new_value, + Logger* logger) const override; virtual bool PartialMerge(const Slice& key, const Slice& left_operand, diff --git a/utilities/merge_operators/string_append/stringappend_test.cc b/utilities/merge_operators/string_append/stringappend_test.cc index b852d30d9c..5ec30a4c40 100644 --- a/utilities/merge_operators/string_append/stringappend_test.cc +++ b/utilities/merge_operators/string_append/stringappend_test.cc @@ -96,6 +96,101 @@ class StringLists { // THE TEST CASES BEGIN HERE class StringAppendOperatorTest { }; +TEST(StringAppendOperatorTest, IteratorTest) { + DestroyDB(kDbName, Options()); // Start this test with a fresh DB + + StringAppendOperator append_op(','); + auto db_ = OpenDb(&append_op); + StringLists slists(db_); + + slists.Append("k1","v1"); + slists.Append("k1","v2"); + slists.Append("k1","v3"); + + slists.Append("k2","a1"); + slists.Append("k2","a2"); + slists.Append("k2","a3"); + + std::string res; + std::unique_ptr it(db_->NewIterator(ReadOptions())); + std::string k1("k1"); + std::string k2("k2"); + bool first = true; + for (it->Seek(k1); it->Valid(); it->Next()) { + res = it->value().ToString(); + if (first) { + ASSERT_EQ(res, "v1,v2,v3"); + first = false; + } else { + ASSERT_EQ(res, "a1,a2,a3"); + } + } + slists.Append("k2", "a4"); + slists.Append("k1", "v4"); + + // Snapshot should still be the same. Should ignore a4 and v4. + first = true; + for (it->Seek(k1); it->Valid(); it->Next()) { + res = it->value().ToString(); + if (first) { + ASSERT_EQ(res, "v1,v2,v3"); + first = false; + } else { + ASSERT_EQ(res, "a1,a2,a3"); + } + } + + + // Should release the snapshot and be aware of the new stuff now + it.reset(db_->NewIterator(ReadOptions())); + first = true; + for (it->Seek(k1); it->Valid(); it->Next()) { + res = it->value().ToString(); + if (first) { + ASSERT_EQ(res, "v1,v2,v3,v4"); + first = false; + } else { + ASSERT_EQ(res, "a1,a2,a3,a4"); + } + } + + // start from k2 this time. + for (it->Seek(k2); it->Valid(); it->Next()) { + res = it->value().ToString(); + if (first) { + ASSERT_EQ(res, "v1,v2,v3,v4"); + first = false; + } else { + ASSERT_EQ(res, "a1,a2,a3,a4"); + } + } + + slists.Append("k3","g1"); + + it.reset(db_->NewIterator(ReadOptions())); + first = true; + std::string k3("k3"); + for(it->Seek(k2); it->Valid(); it->Next()) { + res = it->value().ToString(); + if (first) { + ASSERT_EQ(res, "a1,a2,a3,a4"); + first = false; + } else { + ASSERT_EQ(res, "g1"); + } + } + for(it->Seek(k3); it->Valid(); it->Next()) { + res = it->value().ToString(); + if (first) { + // should not be hit + ASSERT_EQ(res, "a1,a2,a3,a4"); + first = false; + } else { + ASSERT_EQ(res, "g1"); + } + } + +} TEST(StringAppendOperatorTest,SimpleTest) { DestroyDB(kDbName, Options()); // Start this test with a fresh DB diff --git a/utilities/ttl/db_ttl.cc b/utilities/ttl/db_ttl.cc index 0791cdc878..b6235ba4a4 100644 --- a/utilities/ttl/db_ttl.cc +++ b/utilities/ttl/db_ttl.cc @@ -154,7 +154,7 @@ std::vector DBWithTTL::MultiGet(const ReadOptions& options, supported with TTL")); } -bool DBWithTTL::KeyMayExist(ReadOptions& options, +bool DBWithTTL::KeyMayExist(const ReadOptions& options, const Slice& key, std::string* value, bool* value_found) { diff --git a/utilities/ttl/db_ttl.h b/utilities/ttl/db_ttl.h index ada7f99bac..1285b448e2 100644 --- a/utilities/ttl/db_ttl.h +++ b/utilities/ttl/db_ttl.h @@ -36,10 +36,10 @@ class DBWithTTL : public StackableDB { const std::vector& keys, std::vector* values); - virtual bool KeyMayExist(ReadOptions& options, + virtual bool KeyMayExist(const ReadOptions& options, const Slice& key, std::string* value, - bool* value_found = nullptr); + bool* value_found = nullptr) override; virtual Status Delete(const WriteOptions& wopts, const Slice& key); @@ -259,11 +259,11 @@ class TtlMergeOperator : public MergeOperator { assert(merge_op); } - virtual bool Merge(const Slice& key, - const Slice* existing_value, - const std::deque& operands, - std::string* new_value, - Logger* logger) const override { + virtual bool FullMerge(const Slice& key, + const Slice* existing_value, + const std::deque& operands, + std::string* new_value, + Logger* logger) const override { const uint32_t ts_len = DBWithTTL::kTSLength; if (existing_value && existing_value->size() < ts_len) { Log(logger, "Error: Could not remove timestamp from existing value."); @@ -281,14 +281,20 @@ class TtlMergeOperator : public MergeOperator { } // Apply the user merge operator (store result in *new_value) + bool good = true; if (existing_value) { Slice existing_value_without_ts(existing_value->data(), existing_value->size() - ts_len); - user_merge_op_->Merge(key, &existing_value_without_ts, - operands_without_ts, new_value, logger); + good = user_merge_op_->FullMerge(key, &existing_value_without_ts, + operands_without_ts, new_value, logger); } else { - user_merge_op_->Merge(key, nullptr, operands_without_ts, new_value, - logger); + good = user_merge_op_->FullMerge(key, nullptr, operands_without_ts, + new_value, logger); + } + + // Return false if the user merge operator returned false + if (!good) { + return false; } // Augment the *new_value with the ttl time-stamp @@ -321,8 +327,10 @@ class TtlMergeOperator : public MergeOperator { assert(new_value); Slice left_without_ts(left_operand.data(), left_operand.size() - ts_len); Slice right_without_ts(right_operand.data(), right_operand.size() - ts_len); - user_merge_op_->PartialMerge(key, left_without_ts, right_without_ts, - new_value, logger); + if (!user_merge_op_->PartialMerge(key, left_without_ts, right_without_ts, + new_value, logger)) { + return false; + } // Augment the *new_value with the ttl time-stamp int32_t curtime;