From 4c75e21c205b3cc1d38a0add110aecc1d43fa7ae Mon Sep 17 00:00:00 2001 From: Mike Lin Date: Tue, 31 Dec 2013 17:19:38 -0800 Subject: [PATCH 1/3] Eliminate stdout message when launching a posix thread. This seems out of place as it's the only time RocksDB prints to stdout in the normal course of operations. Thread IDs can still be retrieved from the LOG file: cut -d ' ' -f2 LOG | sort | uniq | egrep -x '[0-9a-f]+' --- util/env_posix.cc | 3 --- 1 file changed, 3 deletions(-) diff --git a/util/env_posix.cc b/util/env_posix.cc index 2be524e95d..15ba161de7 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -1393,9 +1393,6 @@ class PosixEnv : public Env { nullptr, &ThreadPool::BGThreadWrapper, this)); - fprintf(stdout, - "Created bg thread 0x%lx\n", - (unsigned long)t); // Set the thread name to aid debugging #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ) From 5c6ef56152f8dc0fe9631332c5e8aa8b20bfbd5e Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Mon, 3 Feb 2014 10:21:58 -0800 Subject: [PATCH 2/3] Fix printf format --- db/compaction_picker.cc | 2 +- db/db_bench.cc | 4 ++-- util/options.cc | 10 +++++----- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 9582b6a29e..28cda9dac4 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -551,7 +551,7 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version) { return nullptr; } Version::FileSummaryStorage tmp; - Log(options_->info_log, "Universal: candidate files(%lu): %s\n", + Log(options_->info_log, "Universal: candidate files(%zu): %s\n", version->files_[level].size(), version->LevelFileSummary(&tmp, 0)); diff --git a/db/db_bench.cc b/db/db_bench.cc index 18258fbb6f..9e6ef70cae 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -1526,14 +1526,14 @@ class Benchmark { ++count; char* tab = std::find(linep, linep + bufferLen, columnSeparator); if (tab == linep + bufferLen) { - fprintf(stderr, "[Error] No Key delimiter TAB at line %ld\n", count); + fprintf(stderr, "[Error] No Key delimiter TAB at line %zu\n", count); continue; } Slice key(linep, tab - linep); tab++; char* endLine = std::find(tab, linep + bufferLen, lineSeparator); if (endLine == linep + bufferLen) { - fprintf(stderr, "[Error] No ENTER at end of line # %ld\n", count); + fprintf(stderr, "[Error] No ENTER at end of line # %zu\n", count); continue; } Slice value(tab, endLine - tab); diff --git a/util/options.cc b/util/options.cc index 671f3d681a..97fffdb18c 100644 --- a/util/options.cc +++ b/util/options.cc @@ -164,11 +164,11 @@ Options::Dump(Logger* log) const Log(log," Options.num_levels: %d", num_levels); Log(log," Options.disableDataSync: %d", disableDataSync); Log(log," Options.use_fsync: %d", use_fsync); - Log(log," Options.max_log_file_size: %ld", max_log_file_size); + Log(log," Options.max_log_file_size: %zu", max_log_file_size); Log(log,"Options.max_manifest_file_size: %lu", (unsigned long)max_manifest_file_size); - Log(log," Options.log_file_time_to_roll: %ld", log_file_time_to_roll); - Log(log," Options.keep_log_file_num: %ld", keep_log_file_num); + Log(log," Options.log_file_time_to_roll: %zu", log_file_time_to_roll); + Log(log," Options.keep_log_file_num: %zu", keep_log_file_num); Log(log," Options.db_stats_log_interval: %d", db_stats_log_interval); Log(log," Options.allow_os_buffer: %d", allow_os_buffer); @@ -224,7 +224,7 @@ Options::Dump(Logger* log) const table_cache_numshardbits); Log(log," Options.table_cache_remove_scan_count_limit: %d", table_cache_remove_scan_count_limit); - Log(log," Options.arena_block_size: %ld", + Log(log," Options.arena_block_size: %zu", arena_block_size); Log(log," Options.delete_obsolete_files_period_micros: %lu", (unsigned long)delete_obsolete_files_period_micros); @@ -244,7 +244,7 @@ Options::Dump(Logger* log) const (unsigned long)WAL_ttl_seconds); Log(log," Options.WAL_size_limit_MB: %lu", (unsigned long)WAL_size_limit_MB); - Log(log," Options.manifest_preallocation_size: %ld", + Log(log," Options.manifest_preallocation_size: %zu", manifest_preallocation_size); Log(log," Options.purge_redundant_kvs_while_flush: %d", purge_redundant_kvs_while_flush); From 5b3b6549d68b61e65c1614ad5f4da115a06a94f0 Mon Sep 17 00:00:00 2001 From: Lei Jin Date: Mon, 3 Feb 2014 13:13:36 -0800 Subject: [PATCH 3/3] use super_version in NewIterator() and MultiGet() function Summary: Use super_version insider NewIterator to avoid Ref() each component separately under mutex The new added bench shows NewIterator QPS increases from 515K to 719K No meaningful improvement for multiget I guess due to its relatively small cost comparing to 90 keys fetch in the test. Test Plan: unit test and db_bench Reviewers: igor, sdong Reviewed By: igor CC: leveldb, dhruba Differential Revision: https://reviews.facebook.net/D15609 --- db/db_bench.cc | 13 ++++ db/db_impl.cc | 161 ++++++++++++++++------------------------- db/db_impl.h | 4 +- db/db_impl_readonly.cc | 11 ++- db/db_test.cc | 24 ++++++ 5 files changed, 110 insertions(+), 103 deletions(-) diff --git a/db/db_bench.cc b/db/db_bench.cc index 9e6ef70cae..8355a3f0c2 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -99,6 +99,7 @@ DEFINE_string(benchmarks, "Must be used with merge_operator\n" "\treadrandommergerandom -- perform N random read-or-merge " "operations. Must be used with merge_operator\n" + "\tnewiterator -- repeated iterator creation\n" "\tseekrandom -- N random seeks\n" "\tcrc32c -- repeated crc32c of 4K of data\n" "\tacquireload -- load N*1000 times\n" @@ -1089,6 +1090,8 @@ class Benchmark { method = &Benchmark::ReadRandom; } else if (name == Slice("readmissing")) { method = &Benchmark::ReadMissing; + } else if (name == Slice("newiterator")) { + method = &Benchmark::IteratorCreation; } else if (name == Slice("seekrandom")) { method = &Benchmark::SeekRandom; } else if (name == Slice("readhot")) { @@ -1877,6 +1880,16 @@ class Benchmark { thread->stats.AddMessage(msg); } + void IteratorCreation(ThreadState* thread) { + Duration duration(FLAGS_duration, reads_); + ReadOptions options(FLAGS_verify_checksum, true); + while (!duration.Done(1)) { + Iterator* iter = db_->NewIterator(options); + delete iter; + thread->stats.FinishedSingleOp(db_); + } + } + void SeekRandom(ThreadState* thread) { Duration duration(FLAGS_duration, reads_); ReadOptions options(FLAGS_verify_checksum, true); diff --git a/db/db_impl.cc b/db/db_impl.cc index 1c4f73b8eb..4dd457e48d 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2668,34 +2668,29 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, namespace { struct IterState { + IterState(DBImpl* db, port::Mutex* mu, DBImpl::SuperVersion* super_version) + : db(db), mu(mu), super_version(super_version) {} + + DBImpl* db; port::Mutex* mu; - Version* version = nullptr; - MemTable* mem = nullptr; - MemTableListVersion* imm = nullptr; - DBImpl *db; + DBImpl::SuperVersion* super_version; }; static void CleanupIteratorState(void* arg1, void* arg2) { IterState* state = reinterpret_cast(arg1); DBImpl::DeletionState deletion_state(state->db->GetOptions(). max_write_buffer_number); - state->mu->Lock(); - if (state->mem) { // not set for immutable iterator - MemTable* m = state->mem->Unref(); - if (m != nullptr) { - deletion_state.memtables_to_free.push_back(m); - } + + bool need_cleanup = state->super_version->Unref(); + if (need_cleanup) { + state->mu->Lock(); + state->super_version->Cleanup(); + state->db->FindObsoleteFiles(deletion_state, false, true); + state->mu->Unlock(); + + delete state->super_version; + state->db->PurgeObsoleteFiles(deletion_state); } - if (state->version) { // not set for memtable-only iterator - state->version->Unref(); - } - if (state->imm) { // not set for memtable-only iterator - state->imm->Unref(&deletion_state.memtables_to_free); - } - // fast path FindObsoleteFiles - state->db->FindObsoleteFiles(deletion_state, false, true); - state->mu->Unlock(); - state->db->PurgeObsoleteFiles(deletion_state); delete state; } @@ -2703,36 +2698,23 @@ static void CleanupIteratorState(void* arg1, void* arg2) { Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, SequenceNumber* latest_snapshot) { - IterState* cleanup = new IterState; - MemTable* mutable_mem; - MemTableListVersion* immutable_mems; - Version* version; - - // Collect together all needed child iterators for mem mutex_.Lock(); *latest_snapshot = versions_->LastSequence(); - mem_->Ref(); - mutable_mem = mem_; - // Collect together all needed child iterators for imm_ - immutable_mems = imm_.current(); - immutable_mems->Ref(); - versions_->current()->Ref(); - version = versions_->current(); + SuperVersion* super_version = super_version_->Ref(); mutex_.Unlock(); std::vector iterator_list; - iterator_list.push_back(mutable_mem->NewIterator(options)); - cleanup->mem = mutable_mem; - cleanup->imm = immutable_mems; + // Collect iterator for mutable mem + iterator_list.push_back(super_version->mem->NewIterator(options)); // Collect all needed child iterators for immutable memtables - immutable_mems->AddIterators(options, &iterator_list); + super_version->imm->AddIterators(options, &iterator_list); // Collect iterators for files in L0 - Ln - version->AddIterators(options, storage_options_, &iterator_list); + super_version->current->AddIterators(options, storage_options_, + &iterator_list); Iterator* internal_iter = NewMergingIterator( &internal_comparator_, &iterator_list[0], iterator_list.size()); - cleanup->version = version; - cleanup->mu = &mutex_; - cleanup->db = this; + + IterState* cleanup = new IterState(this, &mutex_, super_version); internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr); return internal_iter; @@ -2747,53 +2729,36 @@ std::pair DBImpl::GetTailingIteratorPair( const ReadOptions& options, uint64_t* superversion_number) { - MemTable* mutable_mem; - MemTableListVersion* immutable_mems; - Version* version; - - // get all child iterators and bump their refcounts under lock mutex_.Lock(); - mutable_mem = mem_; - mutable_mem->Ref(); - immutable_mems = imm_.current(); - immutable_mems->Ref(); - version = versions_->current(); - version->Ref(); + SuperVersion* super_version = super_version_->Ref(); if (superversion_number != nullptr) { *superversion_number = CurrentVersionNumber(); } mutex_.Unlock(); - Iterator* mutable_iter = mutable_mem->NewIterator(options); - IterState* mutable_cleanup = new IterState(); - mutable_cleanup->mem = mutable_mem; - mutable_cleanup->db = this; - mutable_cleanup->mu = &mutex_; - mutable_iter->RegisterCleanup(CleanupIteratorState, mutable_cleanup, nullptr); - + Iterator* mutable_iter = super_version->mem->NewIterator(options); // create a DBIter that only uses memtable content; see NewIterator() mutable_iter = NewDBIterator(&dbname_, env_, options_, user_comparator(), mutable_iter, kMaxSequenceNumber); - Iterator* immutable_iter; - IterState* immutable_cleanup = new IterState(); std::vector list; - immutable_mems->AddIterators(options, &list); - immutable_cleanup->imm = immutable_mems; - version->AddIterators(options, storage_options_, &list); - immutable_cleanup->version = version; - immutable_cleanup->db = this; - immutable_cleanup->mu = &mutex_; - - immutable_iter = + super_version->imm->AddIterators(options, &list); + super_version->current->AddIterators(options, storage_options_, &list); + Iterator* immutable_iter = NewMergingIterator(&internal_comparator_, &list[0], list.size()); - immutable_iter->RegisterCleanup(CleanupIteratorState, immutable_cleanup, - nullptr); // create a DBIter that only uses memtable content; see NewIterator() immutable_iter = NewDBIterator(&dbname_, env_, options_, user_comparator(), immutable_iter, kMaxSequenceNumber); + // register cleanups + mutable_iter->RegisterCleanup(CleanupIteratorState, + new IterState(this, &mutex_, super_version), nullptr); + + // bump the ref one more time since it will be Unref'ed twice + immutable_iter->RegisterCleanup(CleanupIteratorState, + new IterState(this, &mutex_, super_version->Ref()), nullptr); + return std::make_pair(mutable_iter, immutable_iter); } @@ -2924,7 +2889,6 @@ std::vector DBImpl::MultiGet(const ReadOptions& options, StopWatch sw(env_, options_.statistics.get(), DB_MULTIGET, false); SequenceNumber snapshot; - std::vector to_delete; mutex_.Lock(); if (options.snapshot != nullptr) { @@ -2933,16 +2897,9 @@ std::vector DBImpl::MultiGet(const ReadOptions& options, snapshot = versions_->LastSequence(); } - MemTable* mem = mem_; - MemTableListVersion* imm = imm_.current(); - Version* current = versions_->current(); - mem->Ref(); - imm->Ref(); - current->Ref(); - - // Unlock while reading from files and memtables - + SuperVersion* get_version = super_version_->Ref(); mutex_.Unlock(); + bool have_stat_update = false; Version::GetStats stats; @@ -2967,12 +2924,14 @@ std::vector DBImpl::MultiGet(const ReadOptions& options, std::string* value = &(*values)[i]; LookupKey lkey(keys[i], snapshot); - if (mem->Get(lkey, value, &s, merge_context, options_)) { + if (get_version->mem->Get(lkey, value, &s, merge_context, options_)) { // Done - } else if (imm->Get(lkey, value, &s, merge_context, options_)) { + } else if (get_version->imm->Get(lkey, value, &s, merge_context, + options_)) { // Done } else { - current->Get(options, lkey, value, &s, &merge_context, &stats, options_); + get_version->current->Get(options, lkey, value, &s, &merge_context, + &stats, options_); have_stat_update = true; } @@ -2981,20 +2940,28 @@ std::vector DBImpl::MultiGet(const ReadOptions& options, } } - // Post processing (decrement reference counts and record statistics) - mutex_.Lock(); - if (!options_.disable_seek_compaction && - have_stat_update && current->UpdateStats(stats)) { - MaybeScheduleFlushOrCompaction(); + bool delete_get_version = false; + if (!options_.disable_seek_compaction && have_stat_update) { + mutex_.Lock(); + if (get_version->current->UpdateStats(stats)) { + MaybeScheduleFlushOrCompaction(); + } + if (get_version->Unref()) { + get_version->Cleanup(); + delete_get_version = true; + } + mutex_.Unlock(); + } else { + if (get_version->Unref()) { + mutex_.Lock(); + get_version->Cleanup(); + mutex_.Unlock(); + delete_get_version = true; + } + } + if (delete_get_version) { + delete get_version; } - MemTable* m = mem->Unref(); - imm->Unref(&to_delete); - current->Unref(); - mutex_.Unlock(); - - // free up all obsolete memtables outside the mutex - delete m; - for (MemTable* v: to_delete) delete v; RecordTick(options_.statistics.get(), NUMBER_MULTIGET_CALLS); RecordTick(options_.statistics.get(), NUMBER_MULTIGET_KEYS_READ, numKeys); diff --git a/db/db_impl.h b/db/db_impl.h index 545b367aa1..263439d995 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -249,8 +249,8 @@ class DBImpl : public DB { return internal_comparator_.user_comparator(); } - MemTable* GetMemTable() { - return mem_; + SuperVersion* GetSuperVersion() { + return super_version_; } Iterator* NewInternalIterator(const ReadOptions&, diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index f1ffe3ca3d..faa2ff3c7a 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -56,15 +56,15 @@ Status DBImplReadOnly::Get(const ReadOptions& options, const Slice& key, std::string* value) { Status s; - MemTable* mem = GetMemTable(); - Version* current = versions_->current(); SequenceNumber snapshot = versions_->LastSequence(); + SuperVersion* super_version = GetSuperVersion(); MergeContext merge_context; LookupKey lkey(key, snapshot); - if (mem->Get(lkey, value, &s, merge_context, options_)) { + if (super_version->mem->Get(lkey, value, &s, merge_context, options_)) { } else { Version::GetStats stats; - current->Get(options, lkey, value, &s, &merge_context, &stats, options_); + super_version->current->Get(options, lkey, value, &s, &merge_context, + &stats, options_); } return s; } @@ -87,6 +87,9 @@ Status DB::OpenForReadOnly(const Options& options, const std::string& dbname, DBImplReadOnly* impl = new DBImplReadOnly(options, dbname); impl->mutex_.Lock(); Status s = impl->Recover(true /* read only */, error_if_log_file_exist); + if (s.ok()) { + delete impl->InstallSuperVersion(new DBImpl::SuperVersion()); + } impl->mutex_.Unlock(); if (s.ok()) { *dbptr = impl; diff --git a/db/db_test.cc b/db/db_test.cc index baa6e74896..75b5ae2d92 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -421,6 +421,10 @@ class DBTest { return DB::Open(*options, dbname_, db); } + Status ReadOnlyReopen(Options* options) { + return DB::OpenForReadOnly(*options, dbname_, &db_); + } + Status TryReopen(Options* options = nullptr) { delete db_; db_ = nullptr; @@ -727,6 +731,26 @@ TEST(DBTest, ReadWrite) { } while (ChangeOptions()); } +TEST(DBTest, ReadOnlyDB) { + ASSERT_OK(Put("foo", "v1")); + ASSERT_OK(Put("bar", "v2")); + ASSERT_OK(Put("foo", "v3")); + Close(); + + Options options; + ASSERT_OK(ReadOnlyReopen(&options)); + ASSERT_EQ("v3", Get("foo")); + ASSERT_EQ("v2", Get("bar")); + Iterator* iter = db_->NewIterator(ReadOptions()); + int count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + ++count; + } + ASSERT_EQ(count, 2); + delete iter; +} + // Make sure that when options.block_cache is set, after a new table is // created its index/filter blocks are added to block cache. TEST(DBTest, IndexAndFilterBlocksOfNewTableAddedToCache) {