diff --git a/db/db_impl.cc b/db/db_impl.cc index 1bdd61d39c..03d88708a2 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -182,7 +182,9 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) stats_(options.num_levels), delayed_writes_(0), last_flushed_sequence_(0), - storage_options_(options) { + storage_options_(options), + bg_work_gate_closed_(false), + refitting_level_(false) { mem_->Ref(); @@ -900,7 +902,8 @@ Status DBImpl::CompactMemTable(bool* madeProgress) { return s; } -void DBImpl::CompactRange(const Slice* begin, const Slice* end) { +void DBImpl::CompactRange(const Slice* begin, const Slice* end, + bool reduce_level) { int max_level_with_files = 1; { MutexLock l(&mutex_); @@ -915,6 +918,78 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end) { for (int level = 0; level < max_level_with_files; level++) { TEST_CompactRange(level, begin, end); } + + if (reduce_level) { + ReFitLevel(max_level_with_files); + } +} + +// return the same level if it cannot be moved +int DBImpl::FindMinimumEmptyLevelFitting(int level) { + mutex_.AssertHeld(); + int minimum_level = level; + for (int i = level - 1; i > 0; ++i) { + // stop if level i is not empty + if (versions_->NumLevelFiles(i) > 0) break; + + // stop if level i is too small (cannot fit the level files) + if (versions_->MaxBytesForLevel(i) < versions_->NumLevelBytes(level)) break; + + minimum_level = i; + } + return minimum_level; +} + +void DBImpl::ReFitLevel(int level) { + assert(level < NumberLevels()); + + MutexLock l(&mutex_); + + // only allow one thread refitting + if (refitting_level_) { + Log(options_.info_log, "ReFitLevel: another thread is refitting"); + return; + } + refitting_level_ = true; + + // wait for all background threads to stop + bg_work_gate_closed_ = true; + while (bg_compaction_scheduled_ > 0) { + Log(options_.info_log, + "RefitLevel: waiting for background threads to stop: %d", + bg_compaction_scheduled_); + bg_cv_.Wait(); + } + + // move to a smaller level + int to_level = FindMinimumEmptyLevelFitting(level); + + assert(to_level <= level); + + if (to_level < level) { + Log(options_.info_log, "Before refitting:\n%s", + versions_->current()->DebugString().data()); + + VersionEdit edit(NumberLevels()); + for (const auto& f : versions_->current()->files_[level]) { + edit.DeleteFile(level, f->number); + edit.AddFile(to_level, f->number, f->file_size, f->smallest, f->largest); + } + Log(options_.info_log, "Apply version edit:\n%s", + edit.DebugString().data()); + + auto status = versions_->LogAndApply(&edit, &mutex_); + + Log(options_.info_log, "LogAndApply: %s\n", status.ToString().data()); + + if (status.ok()) { + Log(options_.info_log, "After refitting:\n%s", + versions_->current()->DebugString().data()); + } + } + + refitting_level_ = false; + bg_work_gate_closed_ = false; } int DBImpl::NumberLevels() { @@ -1238,7 +1313,9 @@ Status DBImpl::TEST_WaitForCompact() { void DBImpl::MaybeScheduleCompaction() { mutex_.AssertHeld(); - if (bg_compaction_scheduled_ >= options_.max_background_compactions) { + if (bg_work_gate_closed_) { + // gate closed for backgrond work + } else if (bg_compaction_scheduled_ >= options_.max_background_compactions) { // Already scheduled } else if (shutting_down_.Acquire_Load()) { // DB is being deleted; no more background compactions diff --git a/db/db_impl.h b/db/db_impl.h index 5f09035f2c..157acd9b82 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -57,7 +57,8 @@ class DBImpl : public DB { virtual void ReleaseSnapshot(const Snapshot* snapshot); virtual bool GetProperty(const Slice& property, std::string* value); virtual void GetApproximateSizes(const Range* range, int n, uint64_t* sizes); - virtual void CompactRange(const Slice* begin, const Slice* end); + virtual void CompactRange(const Slice* begin, const Slice* end, + bool reduce_level = false); virtual int NumberLevels(); virtual int MaxMemCompactionLevel(); virtual int Level0StopWriteTrigger(); @@ -221,6 +222,14 @@ class DBImpl : public DB { // dump leveldb.stats to LOG void MaybeDumpStats(); + // Return the minimum empty level that could hold the total data in the + // input level. Return the input level, if such level could not be found. + int FindMinimumEmptyLevelFitting(int level); + + // Move the files in the input level to the minimum level that could hold + // the data set. + void ReFitLevel(int level); + // Constant after construction const InternalFilterPolicy internal_filter_policy_; bool owns_info_log_; @@ -370,6 +379,12 @@ class DBImpl : public DB { // The options to access storage files const EnvOptions storage_options_; + // A value of true temporarily disables scheduling of background work + bool bg_work_gate_closed_; + + // Guard against multiple concurrent refitting + bool refitting_level_; + // No copying allowed DBImpl(const DBImpl&); void operator=(const DBImpl&); diff --git a/db/db_impl_readonly.h b/db/db_impl_readonly.h index 317d290d03..6199b5e7bb 100644 --- a/db/db_impl_readonly.h +++ b/db/db_impl_readonly.h @@ -47,7 +47,8 @@ public: virtual Status Write(const WriteOptions& options, WriteBatch* updates) { return Status::NotSupported("Not supported operation in read only mode."); } - virtual void CompactRange(const Slice* begin, const Slice* end) { + virtual void CompactRange(const Slice* begin, const Slice* end, + bool reduce_level = false) { } virtual Status DisableFileDeletions() { return Status::NotSupported("Not supported operation in read only mode."); diff --git a/db/db_test.cc b/db/db_test.cc index 52c6ad7949..acdcd41e2b 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3058,7 +3058,8 @@ class ModelDB: public DB { sizes[i] = 0; } } - virtual void CompactRange(const Slice* start, const Slice* end) { + virtual void CompactRange(const Slice* start, const Slice* end, + bool reduce_level ) { } virtual int NumberLevels() diff --git a/db/version_set.h b/db/version_set.h index 2369bcc1ea..ec7a2750e5 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -136,6 +136,7 @@ class Version { private: friend class Compaction; friend class VersionSet; + friend class DBImpl; class LevelFileNumIterator; Iterator* NewConcatenatingIterator(const ReadOptions&, diff --git a/include/leveldb/db.h b/include/leveldb/db.h index 056920d9ec..b7efeb54bd 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -180,7 +180,14 @@ class DB { // end==nullptr is treated as a key after all keys in the database. // Therefore the following call will compact the entire database: // db->CompactRange(nullptr, nullptr); - virtual void CompactRange(const Slice* begin, const Slice* end) = 0; + // Note that after the entire database is compacted, all data are pushed + // down to the last level containing any data. If the total data size + // after compaction is reduced, that level might not be appropriate for + // hosting all the files. In this case, client could set reduce_level + // to true, to move the files back to the minimum level capable of holding + // the data set. + virtual void CompactRange(const Slice* begin, const Slice* end, + bool reduce_level = false) = 0; // Number of levels used for this DB. virtual int NumberLevels() = 0; diff --git a/utilities/ttl/db_ttl.cc b/utilities/ttl/db_ttl.cc index eff675340a..7dfda72078 100644 --- a/utilities/ttl/db_ttl.cc +++ b/utilities/ttl/db_ttl.cc @@ -223,8 +223,9 @@ void DBWithTTL::GetApproximateSizes(const Range* r, int n, uint64_t* sizes) { db_->GetApproximateSizes(r, n, sizes); } -void DBWithTTL::CompactRange(const Slice* begin, const Slice* end) { - db_->CompactRange(begin, end); +void DBWithTTL::CompactRange(const Slice* begin, const Slice* end, + bool reduce_level) { + db_->CompactRange(begin, end, reduce_level); } int DBWithTTL::NumberLevels() { diff --git a/utilities/ttl/db_ttl.h b/utilities/ttl/db_ttl.h index d24efbe489..d66e396cae 100644 --- a/utilities/ttl/db_ttl.h +++ b/utilities/ttl/db_ttl.h @@ -54,7 +54,8 @@ class DBWithTTL : public DB, CompactionFilter { virtual void GetApproximateSizes(const Range* r, int n, uint64_t* sizes); - virtual void CompactRange(const Slice* begin, const Slice* end); + virtual void CompactRange(const Slice* begin, const Slice* end, + bool reduce_level = false); virtual int NumberLevels();