From d9cd7a063f919d4a57334932e57b31571ce87ddc Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Tue, 14 Jan 2014 16:19:09 -0800 Subject: [PATCH] Fix CompactRange to apply filter to every key Summary: When doing CompactRange(), we should first flush the memtable and then calculate max_level_with_files. Also, we want to compact all the levels that have files, including level `max_level_with_files`. This patch fixed the unit test. Test Plan: Added a failing unit test and a fix, so it's not failing anymore. Reviewers: dhruba, haobo, sdong Reviewed By: haobo CC: leveldb, xjin Differential Revision: https://reviews.facebook.net/D14421 --- db/db_impl.cc | 85 +++++++++++++++++++++++----------- db/db_impl.h | 12 ++++- db/db_test.cc | 56 +++++++++++++--------- db/version_set.cc | 51 +++++++++++++------- db/version_set.h | 16 +++++-- include/rocksdb/db.h | 1 + util/manual_compaction_test.cc | 75 +++++++++++++++++++++++++++--- 7 files changed, 216 insertions(+), 80 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 4781ad85d5..908ede5b4a 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1278,8 +1278,11 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, return s; } -void DBImpl::CompactRange(const Slice* begin, const Slice* end, - bool reduce_level, int target_level) { +void DBImpl::CompactRange(const Slice* begin, + const Slice* end, + bool reduce_level, + int target_level) { + FlushMemTable(FlushOptions()); int max_level_with_files = 1; { MutexLock l(&mutex_); @@ -1290,9 +1293,15 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end, } } } - TEST_FlushMemTable(); // TODO(sanjay): Skip if memtable does not overlap - for (int level = 0; level < max_level_with_files; level++) { - TEST_CompactRange(level, begin, end); + for (int level = 0; level <= max_level_with_files; level++) { + // in case the compaction is unversal or if we're compacting the + // bottom-most level, the output level will be the same as input one + if (options_.compaction_style == kCompactionStyleUniversal || + level == max_level_with_files) { + RunManualCompaction(level, level, begin, end); + } else { + RunManualCompaction(level, level + 1, begin, end); + } } if (reduce_level) { @@ -1591,13 +1600,17 @@ Status DBImpl::AppendSortedWalsOfType(const std::string& path, return status; } -void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { - assert(level >= 0); +void DBImpl::RunManualCompaction(int input_level, + int output_level, + const Slice* begin, + const Slice* end) { + assert(input_level >= 0); InternalKey begin_storage, end_storage; ManualCompaction manual; - manual.level = level; + manual.input_level = input_level; + manual.output_level = output_level; manual.done = false; manual.in_progress = false; // For universal compaction, we enforce every manual compaction to compact @@ -1625,11 +1638,11 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { // can compact any range of keys/files. // // bg_manual_only_ is non-zero when at least one thread is inside - // TEST_CompactRange(), i.e. during that time no other compaction will + // RunManualCompaction(), i.e. during that time no other compaction will // get scheduled (see MaybeScheduleFlushOrCompaction). // // Note that the following loop doesn't stop more that one thread calling - // TEST_CompactRange() from getting to the second while loop below. + // RunManualCompaction() from getting to the second while loop below. // However, only one of them will actually schedule compaction, while // others will wait on a condition variable until it completes. @@ -1659,6 +1672,15 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { --bg_manual_only_; } +void DBImpl::TEST_CompactRange(int level, + const Slice* begin, + const Slice* end) { + int output_level = (options_.compaction_style == kCompactionStyleUniversal) + ? level + : level + 1; + RunManualCompaction(level, output_level, begin, end); +} + Status DBImpl::FlushMemTable(const FlushOptions& options) { // nullptr batch means just wait for earlier writes to be done Status s = Write(WriteOptions(), nullptr); @@ -1878,23 +1900,27 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, unique_ptr c; bool is_manual = (manual_compaction_ != nullptr) && (manual_compaction_->in_progress == false); - InternalKey manual_end; + InternalKey manual_end_storage; + InternalKey* manual_end = &manual_end_storage; if (is_manual) { ManualCompaction* m = manual_compaction_; assert(!m->in_progress); m->in_progress = true; // another thread cannot pick up the same work - c.reset(versions_->CompactRange(m->level, m->begin, m->end)); - if (c) { - manual_end = c->input(0, c->num_input_files(0) - 1)->largest; - } else { + c.reset(versions_->CompactRange( + m->input_level, m->output_level, m->begin, m->end, &manual_end)); + if (!c) { m->done = true; } Log(options_.info_log, - "Manual compaction at level-%d from %s .. %s; will stop at %s\n", - m->level, + "Manual compaction from level-%d to level-%d from %s .. %s; will stop " + "at %s\n", + m->input_level, + m->output_level, (m->begin ? m->begin->DebugString().c_str() : "(begin)"), (m->end ? m->end->DebugString().c_str() : "(end)"), - (m->done ? "(end)" : manual_end.DebugString().c_str())); + ((m->done || manual_end == nullptr) + ? "(end)" + : manual_end->DebugString().c_str())); } else if (!options_.disable_auto_compactions) { c.reset(versions_->PickCompaction()); } @@ -1959,13 +1985,19 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, // Also note that, if we don't stop here, then the current compaction // writes a new file back to level 0, which will be used in successive // compaction. Hence the manual compaction will never finish. - if (options_.compaction_style == kCompactionStyleUniversal) { + // + // Stop the compaction if manual_end points to nullptr -- this means + // that we compacted the whole range. manual_end should always point + // to nullptr in case of universal compaction + if (manual_end == nullptr) { m->done = true; } if (!m->done) { // We only compacted part of the requested range. Update *m // to the range that is left to be compacted. - m->tmp_storage = manual_end; + // Universal compaction should always compact the whole range + assert(options_.compaction_style != kCompactionStyleUniversal); + m->tmp_storage = *manual_end; m->begin = &m->tmp_storage; } m->in_progress = false; // not being processed anymore @@ -1997,14 +2029,14 @@ void DBImpl::CleanupCompaction(CompactionState* compact, Status status) { } // Allocate the file numbers for the output file. We allocate as -// many output file numbers as there are files in level+1. +// many output file numbers as there are files in level+1 (at least one) // Insert them into pending_outputs so that they do not get deleted. void DBImpl::AllocateCompactionOutputFileNumbers(CompactionState* compact) { mutex_.AssertHeld(); assert(compact != nullptr); assert(compact->builder == nullptr); int filesNeeded = compact->compaction->num_input_files(1); - for (int i = 0; i < filesNeeded; i++) { + for (int i = 0; i < std::max(filesNeeded, 1); i++) { uint64_t file_number = versions_->NewFileNumber(); pending_outputs_.insert(file_number); compact->allocated_file_numbers.push_back(file_number); @@ -2148,14 +2180,11 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { // Add compaction outputs compact->compaction->AddInputDeletions(compact->compaction->edit()); - const int level = compact->compaction->level(); for (size_t i = 0; i < compact->outputs.size(); i++) { const CompactionState::Output& out = compact->outputs[i]; compact->compaction->edit()->AddFile( - (options_.compaction_style == kCompactionStyleUniversal) ? - level : level + 1, - out.number, out.file_size, out.smallest, out.largest, - out.smallest_seqno, out.largest_seqno); + compact->compaction->output_level(), out.number, out.file_size, + out.smallest, out.largest, out.smallest_seqno, out.largest_seqno); } return versions_->LogAndApply(compact->compaction->edit(), &mutex_); } @@ -2197,7 +2226,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, compact->compaction->num_input_files(0), compact->compaction->level(), compact->compaction->num_input_files(1), - compact->compaction->level() + 1, + compact->compaction->output_level(), compact->compaction->score(), options_.max_background_compactions - bg_compaction_scheduled_); char scratch[256]; diff --git a/db/db_impl.h b/db/db_impl.h index d74b77aa4d..476b2bf549 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -89,10 +89,17 @@ class DBImpl : public DB { virtual Status GetDbIdentity(std::string& identity); + void RunManualCompaction(int input_level, + int output_level, + const Slice* begin, + const Slice* end); + // Extra methods (for testing) that are not in the public DB interface // Compact any files in the named level that overlap [*begin, *end] - void TEST_CompactRange(int level, const Slice* begin, const Slice* end); + void TEST_CompactRange(int level, + const Slice* begin, + const Slice* end); // Force current memtable contents to be flushed. Status TEST_FlushMemTable(); @@ -406,7 +413,8 @@ class DBImpl : public DB { // Information for a manual compaction struct ManualCompaction { - int level; + int input_level; + int output_level; bool done; bool in_progress; // compaction request being processed? const InternalKey* begin; // nullptr means beginning of key range diff --git a/db/db_test.cc b/db/db_test.cc index 91970381fe..9c8a97f936 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3309,34 +3309,46 @@ TEST(DBTest, ManualCompaction) { ASSERT_EQ(dbfull()->MaxMemCompactionLevel(), 2) << "Need to update this test to match kMaxMemCompactLevel"; - MakeTables(3, "p", "q"); - ASSERT_EQ("1,1,1", FilesPerLevel()); + // iter - 0 with 7 levels + // iter - 1 with 3 levels + for (int iter = 0; iter < 2; ++iter) { + MakeTables(3, "p", "q"); + ASSERT_EQ("1,1,1", FilesPerLevel()); - // Compaction range falls before files - Compact("", "c"); - ASSERT_EQ("1,1,1", FilesPerLevel()); + // Compaction range falls before files + Compact("", "c"); + ASSERT_EQ("1,1,1", FilesPerLevel()); - // Compaction range falls after files - Compact("r", "z"); - ASSERT_EQ("1,1,1", FilesPerLevel()); + // Compaction range falls after files + Compact("r", "z"); + ASSERT_EQ("1,1,1", FilesPerLevel()); - // Compaction range overlaps files - Compact("p1", "p9"); - ASSERT_EQ("0,0,1", FilesPerLevel()); + // Compaction range overlaps files + Compact("p1", "p9"); + ASSERT_EQ("0,0,1", FilesPerLevel()); - // Populate a different range - MakeTables(3, "c", "e"); - ASSERT_EQ("1,1,2", FilesPerLevel()); + // Populate a different range + MakeTables(3, "c", "e"); + ASSERT_EQ("1,1,2", FilesPerLevel()); - // Compact just the new range - Compact("b", "f"); - ASSERT_EQ("0,0,2", FilesPerLevel()); + // Compact just the new range + Compact("b", "f"); + ASSERT_EQ("0,0,2", FilesPerLevel()); + + // Compact all + MakeTables(1, "a", "z"); + ASSERT_EQ("0,1,2", FilesPerLevel()); + db_->CompactRange(nullptr, nullptr); + ASSERT_EQ("0,0,1", FilesPerLevel()); + + if (iter == 0) { + Options options = CurrentOptions(); + options.num_levels = 3; + options.create_if_missing = true; + DestroyAndReopen(&options); + } + } - // Compact all - MakeTables(1, "a", "z"); - ASSERT_EQ("0,1,2", FilesPerLevel()); - db_->CompactRange(nullptr, nullptr); - ASSERT_EQ("0,0,1", FilesPerLevel()); } TEST(DBTest, DBOpen_Options) { diff --git a/db/version_set.cc b/db/version_set.cc index 91b3dcd3f0..a411ea2108 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2715,6 +2715,7 @@ Compaction* VersionSet::PickCompaction() { bool VersionSet::ParentRangeInCompaction(const InternalKey* smallest, const InternalKey* largest, int level, int* parent_index) { std::vector inputs; + assert(level + 1 < NumberLevels()); current_->GetOverlappingInputs(level+1, smallest, largest, &inputs, *parent_index, parent_index); @@ -2776,7 +2777,8 @@ void VersionSet::ExpandWhileOverlapping(Compaction* c) { // compaction, then we must drop/cancel this compaction. int parent_index = -1; if (FilesInCompaction(c->inputs_[0]) || - ParentRangeInCompaction(&smallest, &largest, level, &parent_index)) { + (c->level() != c->output_level() && + ParentRangeInCompaction(&smallest, &largest, level, &parent_index))) { c->inputs_[0].clear(); c->inputs_[1].clear(); delete c; @@ -2790,7 +2792,9 @@ void VersionSet::ExpandWhileOverlapping(Compaction* c) { // user-key with another file. void VersionSet::SetupOtherInputs(Compaction* c) { // If inputs are empty, then there is nothing to expand. - if (c->inputs_[0].empty()) { + // If both input and output levels are the same, no need to consider + // files at level "level+1" + if (c->inputs_[0].empty() || c->level() == c->output_level()) { return; } @@ -2918,11 +2922,13 @@ void VersionSet::GetObsoleteFiles(std::vector* files) { obsolete_files_.clear(); } -Compaction* VersionSet::CompactRange( - int level, - const InternalKey* begin, - const InternalKey* end) { +Compaction* VersionSet::CompactRange(int input_level, + int output_level, + const InternalKey* begin, + const InternalKey* end, + InternalKey** compaction_end) { std::vector inputs; + bool covering_the_whole_range = true; // All files are 'overlapping' in universal style compaction. // We have to compact the entire range in one shot. @@ -2930,7 +2936,7 @@ Compaction* VersionSet::CompactRange( begin = nullptr; end = nullptr; } - current_->GetOverlappingInputs(level, begin, end, &inputs); + current_->GetOverlappingInputs(input_level, begin, end, &inputs); if (inputs.empty()) { return nullptr; } @@ -2939,24 +2945,26 @@ Compaction* VersionSet::CompactRange( // But we cannot do this for level-0 since level-0 files can overlap // and we must not pick one file and drop another older file if the // two files overlap. - if (level > 0) { - const uint64_t limit = MaxFileSizeForLevel(level) * - options_->source_compaction_factor; + if (input_level > 0) { + const uint64_t limit = + MaxFileSizeForLevel(input_level) * options_->source_compaction_factor; uint64_t total = 0; - for (size_t i = 0; i < inputs.size(); ++i) { + for (size_t i = 0; i + 1 < inputs.size(); ++i) { uint64_t s = inputs[i]->file_size; total += s; if (total >= limit) { + **compaction_end = inputs[i + 1]->smallest; + covering_the_whole_range = false; inputs.resize(i + 1); break; } } } - int out_level = (options_->compaction_style == kCompactionStyleUniversal) ? - level : level+1; - - Compaction* c = new Compaction(level, out_level, MaxFileSizeForLevel(out_level), - MaxGrandParentOverlapBytes(level), NumberLevels()); + Compaction* c = new Compaction(input_level, + output_level, + MaxFileSizeForLevel(output_level), + MaxGrandParentOverlapBytes(input_level), + NumberLevels()); c->inputs_[0] = inputs; ExpandWhileOverlapping(c); @@ -2969,6 +2977,10 @@ Compaction* VersionSet::CompactRange( c->input_version_->Ref(); SetupOtherInputs(c); + if (covering_the_whole_range) { + *compaction_end = nullptr; + } + // These files that are to be manaully compacted do not trample // upon other files because manual compactions are processed when // the system has a max of 1 background compaction thread. @@ -3016,7 +3028,10 @@ bool Compaction::IsTrivialMove() const { // Avoid a move if there is lots of overlapping grandparent data. // Otherwise, the move could create a parent file that will require // a very expensive merge later on. - return (num_input_files(0) == 1 && + // If level_== out_level_, the purpose is to force compaction filter to be + // applied to that level, and thus cannot be a trivia move. + return (level_ != out_level_ && + num_input_files(0) == 1 && num_input_files(1) == 0 && TotalFileSize(grandparents_) <= maxGrandParentOverlapBytes_); } @@ -3109,7 +3124,7 @@ void Compaction::SetupBottomMostLevel(bool isManual) { } bottommost_level_ = true; int num_levels = input_version_->vset_->NumberLevels(); - for (int i = level() + 2; i < num_levels; i++) { + for (int i = output_level() + 1; i < num_levels; i++) { if (input_version_->vset_->NumLevelFiles(i) > 0) { bottommost_level_ = false; break; diff --git a/db/version_set.h b/db/version_set.h index 85ff2ff369..2c91532b5c 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -310,10 +310,18 @@ class VersionSet { // the specified level. Returns nullptr if there is nothing in that // level that overlaps the specified range. Caller should delete // the result. - Compaction* CompactRange( - int level, - const InternalKey* begin, - const InternalKey* end); + // + // The returned Compaction might not include the whole requested range. + // In that case, compaction_end will be set to the next key that needs + // compacting. In case the compaction will compact the whole range, + // compaction_end will be set to nullptr. + // Client is responsible for compaction_end storage -- when called, + // *compaction_end should point to valid InternalKey! + Compaction* CompactRange(int input_level, + int output_level, + const InternalKey* begin, + const InternalKey* end, + InternalKey** compaction_end); // Return the maximum overlapping data (in bytes) at next level for any // file at a level >= 1. diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index dd17d9e9b5..4bf095756c 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -199,6 +199,7 @@ class DB { uint64_t* sizes) = 0; // Compact the underlying storage for the key range [*begin,*end]. + // The actual compaction interval might be superset of [*begin, *end]. // In particular, deleted and overwritten versions are discarded, // and the data is rearranged to reduce the cost of operations // needed to access the data. This operation should typically only diff --git a/util/manual_compaction_test.cc b/util/manual_compaction_test.cc index ebe1339e53..dd615f0570 100644 --- a/util/manual_compaction_test.cc +++ b/util/manual_compaction_test.cc @@ -9,9 +9,13 @@ #include #include "rocksdb/db.h" +#include "rocksdb/compaction_filter.h" +#include "rocksdb/slice.h" #include "rocksdb/write_batch.h" #include "util/testharness.h" +using namespace rocksdb; + namespace { const int kNumKeys = 1100000; @@ -26,12 +30,71 @@ std::string Key2(int i) { return Key1(i) + "_xxx"; } -class ManualCompactionTest { }; +class ManualCompactionTest { + public: + ManualCompactionTest() { + // Get rid of any state from an old run. + dbname_ = rocksdb::test::TmpDir() + "/rocksdb_cbug_test"; + DestroyDB(dbname_, rocksdb::Options()); + } + + std::string dbname_; +}; + +class DestroyAllCompactionFilter : public CompactionFilter { + public: + DestroyAllCompactionFilter() {} + + virtual bool Filter(int level, + const Slice& key, + const Slice& existing_value, + std::string* new_value, + bool* value_changed) const { + return existing_value.ToString() == "destroy"; + } + + virtual const char* Name() const { + return "DestroyAllCompactionFilter"; + } +}; + +TEST(ManualCompactionTest, CompactTouchesAllKeys) { + for (int iter = 0; iter < 2; ++iter) { + DB* db; + Options options; + if (iter == 0) { // level compaction + options.num_levels = 3; + options.compaction_style = kCompactionStyleLevel; + } else { // universal compaction + options.compaction_style = kCompactionStyleUniversal; + } + options.create_if_missing = true; + options.compression = rocksdb::kNoCompression; + options.compaction_filter = new DestroyAllCompactionFilter(); + ASSERT_OK(DB::Open(options, dbname_, &db)); + + db->Put(WriteOptions(), Slice("key1"), Slice("destroy")); + db->Put(WriteOptions(), Slice("key2"), Slice("destroy")); + db->Put(WriteOptions(), Slice("key3"), Slice("value3")); + db->Put(WriteOptions(), Slice("key4"), Slice("destroy")); + + Slice key4("key4"); + db->CompactRange(nullptr, &key4); + Iterator* itr = db->NewIterator(ReadOptions()); + itr->SeekToFirst(); + ASSERT_TRUE(itr->Valid()); + ASSERT_EQ("key3", itr->key().ToString()); + itr->Next(); + ASSERT_TRUE(!itr->Valid()); + delete itr; + + delete options.compaction_filter; + delete db; + DestroyDB(dbname_, options); + } +} TEST(ManualCompactionTest, Test) { - // Get rid of any state from an old run. - std::string dbpath = rocksdb::test::TmpDir() + "/rocksdb_cbug_test"; - DestroyDB(dbpath, rocksdb::Options()); // Open database. Disable compression since it affects the creation // of layers and the code below is trying to test against a very @@ -40,7 +103,7 @@ TEST(ManualCompactionTest, Test) { rocksdb::Options db_options; db_options.create_if_missing = true; db_options.compression = rocksdb::kNoCompression; - ASSERT_OK(rocksdb::DB::Open(db_options, dbpath, &db)); + ASSERT_OK(rocksdb::DB::Open(db_options, dbname_, &db)); // create first key range rocksdb::WriteBatch batch; @@ -83,7 +146,7 @@ TEST(ManualCompactionTest, Test) { // close database delete db; - DestroyDB(dbpath, rocksdb::Options()); + DestroyDB(dbname_, rocksdb::Options()); } } // anonymous namespace