diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index c4e5719c38..958d2797ac 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -551,7 +551,7 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version) { return nullptr; } Version::FileSummaryStorage tmp; - Log(options_->info_log, "Universal: candidate files(%lu): %s\n", + Log(options_->info_log, "Universal: candidate files(%zu): %s\n", version->files_[level].size(), version->LevelFileSummary(&tmp, 0)); diff --git a/db/db_bench.cc b/db/db_bench.cc index 18258fbb6f..8355a3f0c2 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -99,6 +99,7 @@ DEFINE_string(benchmarks, "Must be used with merge_operator\n" "\treadrandommergerandom -- perform N random read-or-merge " "operations. Must be used with merge_operator\n" + "\tnewiterator -- repeated iterator creation\n" "\tseekrandom -- N random seeks\n" "\tcrc32c -- repeated crc32c of 4K of data\n" "\tacquireload -- load N*1000 times\n" @@ -1089,6 +1090,8 @@ class Benchmark { method = &Benchmark::ReadRandom; } else if (name == Slice("readmissing")) { method = &Benchmark::ReadMissing; + } else if (name == Slice("newiterator")) { + method = &Benchmark::IteratorCreation; } else if (name == Slice("seekrandom")) { method = &Benchmark::SeekRandom; } else if (name == Slice("readhot")) { @@ -1526,14 +1529,14 @@ class Benchmark { ++count; char* tab = std::find(linep, linep + bufferLen, columnSeparator); if (tab == linep + bufferLen) { - fprintf(stderr, "[Error] No Key delimiter TAB at line %ld\n", count); + fprintf(stderr, "[Error] No Key delimiter TAB at line %zu\n", count); continue; } Slice key(linep, tab - linep); tab++; char* endLine = std::find(tab, linep + bufferLen, lineSeparator); if (endLine == linep + bufferLen) { - fprintf(stderr, "[Error] No ENTER at end of line # %ld\n", count); + fprintf(stderr, "[Error] No ENTER at end of line # %zu\n", count); continue; } Slice value(tab, endLine - tab); @@ -1877,6 +1880,16 @@ class Benchmark { thread->stats.AddMessage(msg); } + void IteratorCreation(ThreadState* thread) { + Duration duration(FLAGS_duration, reads_); + ReadOptions options(FLAGS_verify_checksum, true); + while (!duration.Done(1)) { + Iterator* iter = db_->NewIterator(options); + delete iter; + thread->stats.FinishedSingleOp(db_); + } + } + void SeekRandom(ThreadState* thread) { Duration duration(FLAGS_duration, reads_); ReadOptions options(FLAGS_verify_checksum, true); diff --git a/db/db_impl.cc b/db/db_impl.cc index a5b9f99ba0..9d90569079 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2695,34 +2695,29 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, namespace { struct IterState { + IterState(DBImpl* db, port::Mutex* mu, SuperVersion* super_version) + : db(db), mu(mu), super_version(super_version) {} + + DBImpl* db; port::Mutex* mu; - Version* version = nullptr; - MemTable* mem = nullptr; - MemTableListVersion* imm = nullptr; - DBImpl *db; + SuperVersion* super_version; }; static void CleanupIteratorState(void* arg1, void* arg2) { IterState* state = reinterpret_cast(arg1); DBImpl::DeletionState deletion_state(state->db->GetOptions(). max_write_buffer_number); - state->mu->Lock(); - if (state->mem) { // not set for immutable iterator - MemTable* m = state->mem->Unref(); - if (m != nullptr) { - deletion_state.memtables_to_free.push_back(m); - } + + bool need_cleanup = state->super_version->Unref(); + if (need_cleanup) { + state->mu->Lock(); + state->super_version->Cleanup(); + state->db->FindObsoleteFiles(deletion_state, false, true); + state->mu->Unlock(); + + delete state->super_version; + state->db->PurgeObsoleteFiles(deletion_state); } - if (state->version) { // not set for memtable-only iterator - state->version->Unref(); - } - if (state->imm) { // not set for memtable-only iterator - state->imm->Unref(&deletion_state.memtables_to_free); - } - // fast path FindObsoleteFiles - state->db->FindObsoleteFiles(deletion_state, false, true); - state->mu->Unlock(); - state->db->PurgeObsoleteFiles(deletion_state); delete state; } @@ -2730,36 +2725,24 @@ static void CleanupIteratorState(void* arg1, void* arg2) { Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, SequenceNumber* latest_snapshot) { - IterState* cleanup = new IterState; - MemTable* mutable_mem; - MemTableListVersion* immutable_mems; - Version* version; - - // Collect together all needed child iterators for mem mutex_.Lock(); *latest_snapshot = versions_->LastSequence(); - mutable_mem = default_cfd_->mem(); - mutable_mem->Ref(); - immutable_mems = default_cfd_->imm()->current(); - immutable_mems->Ref(); - version = default_cfd_->current(); - version->Ref(); + SuperVersion* super_version = default_cfd_->GetSuperVersion()->Ref(); mutex_.Unlock(); std::vector iterator_list; - iterator_list.push_back(mutable_mem->NewIterator(options)); - cleanup->mem = mutable_mem; - cleanup->imm = immutable_mems; + // Collect iterator for mutable mem + iterator_list.push_back(super_version->mem->NewIterator(options)); // Collect all needed child iterators for immutable memtables - immutable_mems->AddIterators(options, &iterator_list); + super_version->imm->AddIterators(options, &iterator_list); // Collect iterators for files in L0 - Ln - version->AddIterators(options, storage_options_, &iterator_list); + super_version->current->AddIterators(options, storage_options_, + &iterator_list); Iterator* internal_iter = NewMergingIterator(&default_cfd_->internal_comparator(), &iterator_list[0], iterator_list.size()); - cleanup->version = version; - cleanup->mu = &mutex_; - cleanup->db = this; + + IterState* cleanup = new IterState(this, &mutex_, super_version); internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr); return internal_iter; @@ -2774,53 +2757,36 @@ std::pair DBImpl::GetTailingIteratorPair( const ReadOptions& options, uint64_t* superversion_number) { - MemTable* mutable_mem; - MemTableListVersion* immutable_mems; - Version* version; - - // get all child iterators and bump their refcounts under lock mutex_.Lock(); - mutable_mem = default_cfd_->mem(); - mutable_mem->Ref(); - immutable_mems = default_cfd_->imm()->current(); - immutable_mems->Ref(); - version = default_cfd_->current(); - version->Ref(); + SuperVersion* super_version = default_cfd_->GetSuperVersion()->Ref(); if (superversion_number != nullptr) { *superversion_number = CurrentVersionNumber(); } mutex_.Unlock(); - Iterator* mutable_iter = mutable_mem->NewIterator(options); - IterState* mutable_cleanup = new IterState(); - mutable_cleanup->mem = mutable_mem; - mutable_cleanup->db = this; - mutable_cleanup->mu = &mutex_; - mutable_iter->RegisterCleanup(CleanupIteratorState, mutable_cleanup, nullptr); - + Iterator* mutable_iter = super_version->mem->NewIterator(options); // create a DBIter that only uses memtable content; see NewIterator() mutable_iter = NewDBIterator(&dbname_, env_, options_, user_comparator(), mutable_iter, kMaxSequenceNumber); - Iterator* immutable_iter; - IterState* immutable_cleanup = new IterState(); std::vector list; - immutable_mems->AddIterators(options, &list); - immutable_cleanup->imm = immutable_mems; - version->AddIterators(options, storage_options_, &list); - immutable_cleanup->version = version; - immutable_cleanup->db = this; - immutable_cleanup->mu = &mutex_; - - immutable_iter = NewMergingIterator(&default_cfd_->internal_comparator(), - &list[0], list.size()); - immutable_iter->RegisterCleanup(CleanupIteratorState, immutable_cleanup, - nullptr); + super_version->imm->AddIterators(options, &list); + super_version->current->AddIterators(options, storage_options_, &list); + Iterator* immutable_iter = NewMergingIterator( + &default_cfd_->internal_comparator(), &list[0], list.size()); // create a DBIter that only uses memtable content; see NewIterator() immutable_iter = NewDBIterator(&dbname_, env_, options_, user_comparator(), immutable_iter, kMaxSequenceNumber); + // register cleanups + mutable_iter->RegisterCleanup(CleanupIteratorState, + new IterState(this, &mutex_, super_version), nullptr); + + // bump the ref one more time since it will be Unref'ed twice + immutable_iter->RegisterCleanup(CleanupIteratorState, + new IterState(this, &mutex_, super_version->Ref()), nullptr); + return std::make_pair(mutable_iter, immutable_iter); } @@ -2943,7 +2909,6 @@ std::vector DBImpl::MultiGet( StopWatch sw(env_, options_.statistics.get(), DB_MULTIGET, false); SequenceNumber snapshot; - std::vector to_delete; mutex_.Lock(); if (options.snapshot != nullptr) { @@ -2952,17 +2917,9 @@ std::vector DBImpl::MultiGet( snapshot = versions_->LastSequence(); } - // TODO only works for default column family - MemTable* mem = default_cfd_->mem(); - MemTableListVersion* imm = default_cfd_->imm()->current(); - Version* current = default_cfd_->current(); - mem->Ref(); - imm->Ref(); - current->Ref(); - - // Unlock while reading from files and memtables - + SuperVersion* get_version = default_cfd_->GetSuperVersion()->Ref(); mutex_.Unlock(); + bool have_stat_update = false; Version::GetStats stats; @@ -2987,12 +2944,14 @@ std::vector DBImpl::MultiGet( std::string* value = &(*values)[i]; LookupKey lkey(keys[i], snapshot); - if (mem->Get(lkey, value, &s, merge_context, options_)) { + if (get_version->mem->Get(lkey, value, &s, merge_context, options_)) { // Done - } else if (imm->Get(lkey, value, &s, merge_context, options_)) { + } else if (get_version->imm->Get(lkey, value, &s, merge_context, + options_)) { // Done } else { - current->Get(options, lkey, value, &s, &merge_context, &stats, options_); + get_version->current->Get(options, lkey, value, &s, &merge_context, + &stats, options_); have_stat_update = true; } @@ -3001,20 +2960,28 @@ std::vector DBImpl::MultiGet( } } - // Post processing (decrement reference counts and record statistics) - mutex_.Lock(); - if (!options_.disable_seek_compaction && - have_stat_update && current->UpdateStats(stats)) { - MaybeScheduleFlushOrCompaction(); + bool delete_get_version = false; + if (!options_.disable_seek_compaction && have_stat_update) { + mutex_.Lock(); + if (get_version->current->UpdateStats(stats)) { + MaybeScheduleFlushOrCompaction(); + } + if (get_version->Unref()) { + get_version->Cleanup(); + delete_get_version = true; + } + mutex_.Unlock(); + } else { + if (get_version->Unref()) { + mutex_.Lock(); + get_version->Cleanup(); + mutex_.Unlock(); + delete_get_version = true; + } + } + if (delete_get_version) { + delete get_version; } - MemTable* m = mem->Unref(); - imm->Unref(&to_delete); - current->Unref(); - mutex_.Unlock(); - - // free up all obsolete memtables outside the mutex - delete m; - for (MemTable* v: to_delete) delete v; RecordTick(options_.statistics.get(), NUMBER_MULTIGET_CALLS); RecordTick(options_.statistics.get(), NUMBER_MULTIGET_KEYS_READ, numKeys); diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index cb4952dd84..ab3a63d956 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -56,15 +56,15 @@ Status DBImplReadOnly::Get(const ReadOptions& options, const ColumnFamilyHandle& column_family, const Slice& key, std::string* value) { Status s; - MemTable* mem = GetDefaultColumnFamily()->mem(); - Version* current = GetDefaultColumnFamily()->current(); SequenceNumber snapshot = versions_->LastSequence(); + SuperVersion* super_version = GetDefaultColumnFamily()->GetSuperVersion(); MergeContext merge_context; LookupKey lkey(key, snapshot); - if (mem->Get(lkey, value, &s, merge_context, options_)) { + if (super_version->mem->Get(lkey, value, &s, merge_context, options_)) { } else { Version::GetStats stats; - current->Get(options, lkey, value, &s, &merge_context, &stats, options_); + super_version->current->Get(options, lkey, value, &s, &merge_context, + &stats, options_); } return s; } @@ -92,6 +92,11 @@ Status DB::OpenForReadOnly(const Options& options, const std::string& dbname, ColumnFamilyDescriptor(default_column_family_name, cf_options)); Status s = impl->Recover(column_families, true /* read only */, error_if_log_file_exist); + if (s.ok()) { + for (auto cfd : *impl->versions_->GetColumnFamilySet()) { + delete cfd->InstallSuperVersion(new SuperVersion()); + } + } impl->mutex_.Unlock(); if (s.ok()) { *dbptr = impl; diff --git a/db/db_test.cc b/db/db_test.cc index f0bf78e196..65b1dffd3f 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -421,6 +421,10 @@ class DBTest { return DB::Open(*options, dbname_, db); } + Status ReadOnlyReopen(Options* options) { + return DB::OpenForReadOnly(*options, dbname_, &db_); + } + Status TryReopen(Options* options = nullptr) { delete db_; db_ = nullptr; @@ -730,6 +734,26 @@ TEST(DBTest, ReadWrite) { } while (ChangeOptions()); } +TEST(DBTest, ReadOnlyDB) { + ASSERT_OK(Put("foo", "v1")); + ASSERT_OK(Put("bar", "v2")); + ASSERT_OK(Put("foo", "v3")); + Close(); + + Options options; + ASSERT_OK(ReadOnlyReopen(&options)); + ASSERT_EQ("v3", Get("foo")); + ASSERT_EQ("v2", Get("bar")); + Iterator* iter = db_->NewIterator(ReadOptions()); + int count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + ++count; + } + ASSERT_EQ(count, 2); + delete iter; +} + // Make sure that when options.block_cache is set, after a new table is // created its index/filter blocks are added to block cache. TEST(DBTest, IndexAndFilterBlocksOfNewTableAddedToCache) { diff --git a/util/env_posix.cc b/util/env_posix.cc index fb9d311b86..638b6c906f 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -1423,9 +1423,6 @@ class PosixEnv : public Env { nullptr, &ThreadPool::BGThreadWrapper, this)); - fprintf(stdout, - "Created bg thread 0x%lx\n", - (unsigned long)t); // Set the thread name to aid debugging #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ) diff --git a/util/options.cc b/util/options.cc index ca0efd65a0..1d0a4aea86 100644 --- a/util/options.cc +++ b/util/options.cc @@ -263,11 +263,11 @@ Options::Dump(Logger* log) const Log(log," Options.num_levels: %d", num_levels); Log(log," Options.disableDataSync: %d", disableDataSync); Log(log," Options.use_fsync: %d", use_fsync); - Log(log," Options.max_log_file_size: %ld", max_log_file_size); + Log(log," Options.max_log_file_size: %zu", max_log_file_size); Log(log,"Options.max_manifest_file_size: %lu", (unsigned long)max_manifest_file_size); - Log(log," Options.log_file_time_to_roll: %ld", log_file_time_to_roll); - Log(log," Options.keep_log_file_num: %ld", keep_log_file_num); + Log(log," Options.log_file_time_to_roll: %zu", log_file_time_to_roll); + Log(log," Options.keep_log_file_num: %zu", keep_log_file_num); Log(log," Options.db_stats_log_interval: %d", db_stats_log_interval); Log(log," Options.allow_os_buffer: %d", allow_os_buffer); @@ -323,7 +323,7 @@ Options::Dump(Logger* log) const table_cache_numshardbits); Log(log," Options.table_cache_remove_scan_count_limit: %d", table_cache_remove_scan_count_limit); - Log(log," Options.arena_block_size: %ld", + Log(log," Options.arena_block_size: %zu", arena_block_size); Log(log," Options.delete_obsolete_files_period_micros: %lu", (unsigned long)delete_obsolete_files_period_micros); @@ -343,7 +343,7 @@ Options::Dump(Logger* log) const (unsigned long)WAL_ttl_seconds); Log(log," Options.WAL_size_limit_MB: %lu", (unsigned long)WAL_size_limit_MB); - Log(log," Options.manifest_preallocation_size: %ld", + Log(log," Options.manifest_preallocation_size: %zu", manifest_preallocation_size); Log(log," Options.purge_redundant_kvs_while_flush: %d", purge_redundant_kvs_while_flush);