diff --git a/INSTALL.md b/INSTALL.md index ed1cbfba10..07d9750687 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -19,13 +19,13 @@ libraries. You are on your own. * **Linux** * Upgrade your gcc to version at least 4.7 to get C++11 support. - * Install gflags. If you're on Ubuntu, here's a nice tutorial: + * Install gflags. First, try: `sudo apt-get install libgflags-dev`. + If this doesn't work and you're using Ubuntu, here's a nice tutorial: (http://askubuntu.com/questions/312173/installing-gflags-12-04) * Install snappy. This is usually as easy as: `sudo apt-get install libsnappy-dev`. * Install zlib. Try: `sudo apt-get install zlib1g-dev`. * Install bzip2: `sudo apt-get install libbz2-dev`. - * Install gflags: `sudo apt-get install libgflags-dev`. * **OS X**: * Install latest C++ compiler that supports C++ 11: * Update XCode: run `xcode-select --install` (or install it from XCode App's settting). diff --git a/README b/README index 076f66d74e..c55149d443 100644 --- a/README +++ b/README @@ -16,8 +16,8 @@ The core of this code has been derived from open-source leveldb. The code under this directory implements a system for maintaining a persistent key/value store. -See doc/index.html for more explanation. -See doc/impl.html for a brief overview of the implementation. +See doc/index.html and github wiki (https://github.com/facebook/rocksdb/wiki) +for more explanation. The public interface is in include/*. Callers should not include or rely on the details of any other header files in this package. Those diff --git a/build_tools/regression_build_test.sh b/build_tools/regression_build_test.sh index 6ede474664..1c44e5ad27 100755 --- a/build_tools/regression_build_test.sh +++ b/build_tools/regression_build_test.sh @@ -65,7 +65,7 @@ OPT=-DNDEBUG make db_bench -j$(nproc) --sync=0 \ --threads=8 > ${STAT_FILE}.overwrite -# fill up the db for readrandom benchmark +# fill up the db for readrandom benchmark (1GB total size) ./db_bench \ --benchmarks=fillseq \ --db=$DATA_DIR \ @@ -83,7 +83,7 @@ OPT=-DNDEBUG make db_bench -j$(nproc) --sync=0 \ --threads=1 > /dev/null -# measure readrandom +# measure readrandom with 6GB block cache ./db_bench \ --benchmarks=readrandom \ --db=$DATA_DIR \ @@ -102,6 +102,25 @@ OPT=-DNDEBUG make db_bench -j$(nproc) --sync=0 \ --threads=32 > ${STAT_FILE}.readrandom +# measure readrandom with 300MB block cache +./db_bench \ + --benchmarks=readrandom \ + --db=$DATA_DIR \ + --use_existing_db=1 \ + --bloom_bits=10 \ + --num=$NUM \ + --reads=$NUM \ + --cache_size=314572800 \ + --cache_numshardbits=8 \ + --open_files=55000 \ + --disable_seek_compaction=1 \ + --statistics=1 \ + --histogram=1 \ + --disable_data_sync=1 \ + --disable_wal=1 \ + --sync=0 \ + --threads=32 > ${STAT_FILE}.readrandomsmallblockcache + # measure memtable performance -- none of the data gets flushed to disk ./db_bench \ --benchmarks=fillrandom,readrandom, \ @@ -154,5 +173,6 @@ function send_benchmark_to_ods { send_benchmark_to_ods overwrite overwrite $STAT_FILE.overwrite send_benchmark_to_ods fillseq fillseq $STAT_FILE.fillseq send_benchmark_to_ods readrandom readrandom $STAT_FILE.readrandom +send_benchmark_to_ods readrandom readrandom_smallblockcache $STAT_FILE.readrandomsmallblockcache send_benchmark_to_ods fillrandom memtablefillrandom $STAT_FILE.memtablefillreadrandom send_benchmark_to_ods readrandom memtablereadrandom $STAT_FILE.memtablefillreadrandom diff --git a/db/db_bench.cc b/db/db_bench.cc index 3ab130093c..16b6643c2a 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -191,6 +191,10 @@ DEFINE_int32(universal_max_merge_width, 0, "The max number of files to compact" DEFINE_int32(universal_max_size_amplification_percent, 0, "The max size amplification for universal style compaction"); +DEFINE_int32(universal_compression_size_percent, -1, + "The percentage of the database to compress for universal " + "compaction. -1 means compress everything."); + DEFINE_int64(cache_size, -1, "Number of bytes to use as a cache of uncompressed" "data. Negative means use default settings."); @@ -325,6 +329,23 @@ DEFINE_string(compression_type, "snappy", static enum rocksdb::CompressionType FLAGS_compression_type_e = rocksdb::kSnappyCompression; +DEFINE_int32(compression_level, -1, + "Compression level. For zlib this should be -1 for the " + "default level, or between 0 and 9."); + +static bool ValidateCompressionLevel(const char* flagname, int32_t value) { + if (value < -1 || value > 9) { + fprintf(stderr, "Invalid value for --%s: %d, must be between -1 and 9\n", + flagname, value); + return false; + } + return true; +} + +static const bool FLAGS_compression_level_dummy = + google::RegisterFlagValidator(&FLAGS_compression_level, + &ValidateCompressionLevel); + DEFINE_int32(min_level_to_compress, -1, "If non-negative, compression starts" " from this level. Levels with number < min_level_to_compress are" " not compressed. Otherwise, apply compression_type to " @@ -434,12 +455,11 @@ static bool ValidatePrefixSize(const char* flagname, int32_t value) { } return true; } -DEFINE_int32(prefix_size, 0, "Control the prefix size for PrefixHashRep"); +DEFINE_int32(prefix_size, 0, "Control the prefix size for HashSkipList"); enum RepFactory { kSkipList, kPrefixHash, - kUnsorted, kVectorRep }; enum RepFactory StringToRepFactory(const char* ctype) { @@ -449,8 +469,6 @@ enum RepFactory StringToRepFactory(const char* ctype) { return kSkipList; else if (!strcasecmp(ctype, "prefix_hash")) return kPrefixHash; - else if (!strcasecmp(ctype, "unsorted")) - return kUnsorted; else if (!strcasecmp(ctype, "vector")) return kVectorRep; @@ -807,9 +825,6 @@ class Benchmark { case kSkipList: fprintf(stdout, "Memtablerep: skip_list\n"); break; - case kUnsorted: - fprintf(stdout, "Memtablerep: unsorted\n"); - break; case kVectorRep: fprintf(stdout, "Memtablerep: vector\n"); break; @@ -1334,14 +1349,8 @@ class Benchmark { } switch (FLAGS_rep_factory) { case kPrefixHash: - options.memtable_factory.reset( - new PrefixHashRepFactory(NewFixedPrefixTransform(FLAGS_prefix_size)) - ); - break; - case kUnsorted: - options.memtable_factory.reset( - new UnsortedRepFactory - ); + options.memtable_factory.reset(NewHashSkipListRepFactory( + NewFixedPrefixTransform(FLAGS_prefix_size))); break; case kSkipList: // no need to do anything @@ -1368,6 +1377,7 @@ class Benchmark { options.level0_slowdown_writes_trigger = FLAGS_level0_slowdown_writes_trigger; options.compression = FLAGS_compression_type_e; + options.compression_opts.level = FLAGS_compression_level; options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds; options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB; if (FLAGS_min_level_to_compress >= 0) { @@ -1429,6 +1439,10 @@ class Benchmark { options.compaction_options_universal.max_size_amplification_percent = FLAGS_universal_max_size_amplification_percent; } + if (FLAGS_universal_compression_size_percent != -1) { + options.compaction_options_universal.compression_size_percent = + FLAGS_universal_compression_size_percent; + } Status s; if(FLAGS_readonly) { diff --git a/db/db_impl.cc b/db/db_impl.cc index 6875cb37c1..b1d016ffd3 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -51,6 +51,7 @@ #include "util/auto_roll_logger.h" #include "util/build_version.h" #include "util/coding.h" +#include "util/hash_skiplist_rep.h" #include "util/logging.h" #include "util/mutexlock.h" #include "util/perf_context_imp.h" @@ -163,10 +164,10 @@ Options SanitizeOptions(const std::string& dbname, Log(result.info_log, "Compaction filter specified, ignore factory"); } if (result.prefix_extractor) { - // If a prefix extractor has been supplied and a PrefixHashRepFactory is + // If a prefix extractor has been supplied and a HashSkipListRepFactory is // being used, make sure that the latter uses the former as its transform // function. - auto factory = dynamic_cast( + auto factory = dynamic_cast( result.memtable_factory.get()); if (factory && factory->GetTransform() != result.prefix_extractor) { @@ -236,7 +237,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) mutex_(options.use_adaptive_mutex), shutting_down_(nullptr), bg_cv_(&mutex_), - mem_rep_factory_(options_.memtable_factory), + mem_rep_factory_(options_.memtable_factory.get()), mem_(new MemTable(internal_comparator_, mem_rep_factory_, NumberLevels(), options_)), logfile_number_(0), @@ -516,6 +517,19 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state, // files in sst_delete_files and log_delete_files. // It is not necessary to hold the mutex when invoking this method. void DBImpl::PurgeObsoleteFiles(DeletionState& state) { + + // free pending memtables + for (auto m : state.memtables_to_free) { + delete m; + } + + // check if there is anything to do + if (!state.all_files.size() && + !state.sst_delete_files.size() && + !state.log_delete_files.size()) { + return; + } + // this checks if FindObsoleteFiles() was run before. If not, don't do // PurgeObsoleteFiles(). If FindObsoleteFiles() was run, we need to also // run PurgeObsoleteFiles(), even if disable_delete_obsolete_files_ is true @@ -1170,7 +1184,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, // Replace immutable memtable with the generated Table s = imm_.InstallMemtableFlushResults( mems, versions_.get(), s, &mutex_, options_.info_log.get(), - file_number, pending_outputs_); + file_number, pending_outputs_, &deletion_state.memtables_to_free); if (s.ok()) { if (madeProgress) { @@ -1656,7 +1670,7 @@ Status DBImpl::BackgroundFlush(bool* madeProgress, void DBImpl::BackgroundCallFlush() { bool madeProgress = false; - DeletionState deletion_state; + DeletionState deletion_state(options_.max_write_buffer_number); assert(bg_flush_scheduled_); MutexLock l(&mutex_); @@ -1702,7 +1716,7 @@ void DBImpl::TEST_PurgeObsoleteteWAL() { void DBImpl::BackgroundCallCompaction() { bool madeProgress = false; - DeletionState deletion_state; + DeletionState deletion_state(options_.max_write_buffer_number); MaybeDumpStats(); @@ -1732,6 +1746,7 @@ void DBImpl::BackgroundCallCompaction() { // FindObsoleteFiles(). This is because deletion_state does not catch // all created files if compaction failed. FindObsoleteFiles(deletion_state, !s.ok()); + // delete unnecessary files if any, this is done outside the mutex if (deletion_state.HaveSomethingToDelete()) { mutex_.Unlock(); @@ -2492,25 +2507,20 @@ struct IterState { static void CleanupIteratorState(void* arg1, void* arg2) { IterState* state = reinterpret_cast(arg1); - std::vector to_delete; - to_delete.reserve(state->mem.size()); + DBImpl::DeletionState deletion_state(state->db->GetOptions(). + max_write_buffer_number); state->mu->Lock(); for (unsigned int i = 0; i < state->mem.size(); i++) { MemTable* m = state->mem[i]->Unref(); if (m != nullptr) { - to_delete.push_back(m); + deletion_state.memtables_to_free.push_back(m); } } state->version->Unref(); - // delete only the sst obsolete files - DBImpl::DeletionState deletion_state; // fast path FindObsoleteFiles state->db->FindObsoleteFiles(deletion_state, false, true); state->mu->Unlock(); state->db->PurgeObsoleteFiles(deletion_state); - - // delete obsolete memtables outside the db-mutex - for (MemTable* m : to_delete) delete m; delete state; } } // namespace @@ -2612,8 +2622,10 @@ Status DBImpl::GetImpl(const ReadOptions& options, BumpPerfTime(&perf_context.get_snapshot_time, &snapshot_timer); if (mem->Get(lkey, value, &s, merge_context, options_)) { // Done + RecordTick(options_.statistics.get(), MEMTABLE_HIT); } else if (imm.Get(lkey, value, &s, merge_context, options_)) { // Done + RecordTick(options_.statistics.get(), MEMTABLE_HIT); } else { StopWatchNano from_files_timer(env_, false); StartPerfTimer(&from_files_timer); @@ -2622,6 +2634,7 @@ Status DBImpl::GetImpl(const ReadOptions& options, options_, value_found); have_stat_update = true; BumpPerfTime(&perf_context.get_from_output_files_time, &from_files_timer); + RecordTick(options_.statistics.get(), MEMTABLE_MISS); } StopWatchNano post_process_timer(env_, false); @@ -3514,6 +3527,33 @@ void DBImpl::GetLiveFilesMetaData(std::vector *metadata) { return versions_->GetLiveFilesMetaData(metadata); } +Status DBImpl::GetDbIdentity(std::string& identity) { + std::string idfilename = IdentityFileName(dbname_); + unique_ptr idfile; + const EnvOptions soptions; + Status s = env_->NewSequentialFile(idfilename, &idfile, soptions); + if (!s.ok()) { + return s; + } + uint64_t file_size; + s = env_->GetFileSize(idfilename, &file_size); + if (!s.ok()) { + return s; + } + char buffer[file_size]; + Slice id; + s = idfile->Read(file_size, &id, buffer); + if (!s.ok()) { + return s; + } + identity.assign(id.ToString()); + // If last character is '\n' remove it from identity + if (identity.size() > 0 && identity.back() == '\n') { + identity.pop_back(); + } + return s; +} + // Default implementations of convenience methods that subclasses of DB // can call if they wish Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { diff --git a/db/db_impl.h b/db/db_impl.h index 8a57b92f5d..d7a346b6ea 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -85,6 +85,8 @@ class DBImpl : public DB { virtual void GetLiveFilesMetaData( std::vector *metadata); + virtual Status GetDbIdentity(std::string& identity); + // Extra methods (for testing) that are not in the public DB interface // Compact any files in the named level that overlap [*begin, *end] @@ -129,10 +131,12 @@ class DBImpl : public DB { struct DeletionState { inline bool HaveSomethingToDelete() const { - return all_files.size() || + return memtables_to_free.size() || + all_files.size() || sst_delete_files.size() || log_delete_files.size(); } + // a list of all files that we'll consider deleting // (every once in a while this is filled up with all files // in the DB directory) @@ -147,14 +151,18 @@ class DBImpl : public DB { // a list of log files that we need to delete std::vector log_delete_files; + // a list of memtables to be free + std::vector memtables_to_free; + // the current manifest_file_number, log_number and prev_log_number // that corresponds to the set of files in 'live'. uint64_t manifest_file_number, log_number, prev_log_number; - DeletionState() { + explicit DeletionState(const int num_memtables = 0) { manifest_file_number = 0; log_number = 0; prev_log_number = 0; + memtables_to_free.reserve(num_memtables); } }; @@ -309,7 +317,7 @@ class DBImpl : public DB { port::Mutex mutex_; port::AtomicPointer shutting_down_; port::CondVar bg_cv_; // Signalled when background work finishes - std::shared_ptr mem_rep_factory_; + MemTableRepFactory* mem_rep_factory_; MemTable* mem_; MemTableList imm_; // Memtable that are not changing uint64_t logfile_number_; diff --git a/db/db_iter.cc b/db/db_iter.cc index 0ee4210054..1b5ae5688f 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -61,7 +61,7 @@ class DBIter: public Iterator { const Comparator* cmp, Iterator* iter, SequenceNumber s) : dbname_(dbname), env_(env), - logger_(options.info_log), + logger_(options.info_log.get()), user_comparator_(cmp), user_merge_operator_(options.merge_operator.get()), iter_(iter), @@ -123,7 +123,7 @@ class DBIter: public Iterator { const std::string* const dbname_; Env* const env_; - shared_ptr logger_; + Logger* logger_; const Comparator* const user_comparator_; const MergeOperator* const user_merge_operator_; Iterator* const iter_; @@ -302,7 +302,7 @@ void DBIter::MergeValuesNewToOld() { // ignore corruption if there is any. const Slice value = iter_->value(); user_merge_operator_->FullMerge(ikey.user_key, &value, operands, - &saved_value_, logger_.get()); + &saved_value_, logger_); // iter_ is positioned after put iter_->Next(); return; @@ -319,7 +319,7 @@ void DBIter::MergeValuesNewToOld() { Slice(operands[0]), Slice(operands[1]), &merge_result, - logger_.get())) { + logger_)) { operands.pop_front(); swap(operands.front(), merge_result); } else { @@ -336,7 +336,7 @@ void DBIter::MergeValuesNewToOld() { // feed null as the existing value to the merge operator, such that // client can differentiate this scenario and do things accordingly. user_merge_operator_->FullMerge(saved_key_, nullptr, operands, - &saved_value_, logger_.get()); + &saved_value_, logger_); } void DBIter::Prev() { diff --git a/db/db_statistics.cc b/db/db_statistics.cc new file mode 100644 index 0000000000..f0cfd6740f --- /dev/null +++ b/db/db_statistics.cc @@ -0,0 +1,14 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "db/db_statistics.h" + +namespace rocksdb { + +std::shared_ptr CreateDBStatistics() { + return std::make_shared(); +} + +} // namespace rocksdb diff --git a/db/db_statistics.h b/db/db_statistics.h index 87bc863048..ec71e16883 100644 --- a/db/db_statistics.h +++ b/db/db_statistics.h @@ -58,8 +58,6 @@ class DBStatistics: public Statistics { std::vector allHistograms_; }; -std::shared_ptr CreateDBStatistics() { - return std::make_shared(); -} +std::shared_ptr CreateDBStatistics(); } // namespace rocksdb diff --git a/db/db_test.cc b/db/db_test.cc index c9139282f7..0e94981d2d 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -245,7 +245,6 @@ class DBTest { enum OptionConfig { kDefault, kVectorRep, - kUnsortedRep, kMergePut, kFilter, kUncompressed, @@ -256,7 +255,7 @@ class DBTest { kCompactOnFlush, kPerfOptions, kDeletesFilterFirst, - kPrefixHashRep, + kHashSkipList, kUniversalCompaction, kCompressedBlockCache, kEnd @@ -340,9 +339,9 @@ class DBTest { Options CurrentOptions() { Options options; switch (option_config_) { - case kPrefixHashRep: - options.memtable_factory.reset(new - PrefixHashRepFactory(NewFixedPrefixTransform(1))); + case kHashSkipList: + options.memtable_factory.reset( + NewHashSkipListRepFactory(NewFixedPrefixTransform(1))); break; case kMergePut: options.merge_operator = MergeOperators::CreatePutOperator(); @@ -376,9 +375,6 @@ class DBTest { case kDeletesFilterFirst: options.filter_deletes = true; break; - case kUnsortedRep: - options.memtable_factory.reset(new UnsortedRepFactory); - break; case kVectorRep: options.memtable_factory.reset(new VectorRepFactory(100)); break; @@ -1776,31 +1772,23 @@ TEST(DBTest, ManifestRollOver) { TEST(DBTest, IdentityAcrossRestarts) { do { - std::string idfilename = IdentityFileName(dbname_); - unique_ptr idfile; - const EnvOptions soptions; - ASSERT_OK(env_->NewSequentialFile(idfilename, &idfile, soptions)); - char buffer1[100]; - Slice id1; - ASSERT_OK(idfile->Read(100, &id1, buffer1)); + std::string id1; + ASSERT_OK(db_->GetDbIdentity(id1)); Options options = CurrentOptions(); Reopen(&options); - char buffer2[100]; - Slice id2; - ASSERT_OK(env_->NewSequentialFile(idfilename, &idfile, soptions)); - ASSERT_OK(idfile->Read(100, &id2, buffer2)); + std::string id2; + ASSERT_OK(db_->GetDbIdentity(id2)); // id1 should match id2 because identity was not regenerated - ASSERT_EQ(id1.ToString(), id2.ToString()); + ASSERT_EQ(id1.compare(id2), 0); + std::string idfilename = IdentityFileName(dbname_); ASSERT_OK(env_->DeleteFile(idfilename)); Reopen(&options); - char buffer3[100]; - Slice id3; - ASSERT_OK(env_->NewSequentialFile(idfilename, &idfile, soptions)); - ASSERT_OK(idfile->Read(100, &id3, buffer3)); - // id1 should NOT match id2 because identity was regenerated - ASSERT_NE(id1.ToString(0), id3.ToString()); + std::string id3; + ASSERT_OK(db_->GetDbIdentity(id3)); + // id1 should NOT match id3 because identity was regenerated + ASSERT_NE(id1.compare(id3), 0); } while (ChangeCompactOptions()); } @@ -1856,94 +1844,6 @@ TEST(DBTest, CompactionsGenerateMultipleFiles) { } } -// TODO(kailiu) disable the in non-linux platforms to temporarily solve -// the unit test failure. -#ifdef OS_LINUX -TEST(DBTest, CompressedCache) { - int num_iter = 80; - - // Run this test three iterations. - // Iteration 1: only a uncompressed block cache - // Iteration 2: only a compressed block cache - // Iteration 3: both block cache and compressed cache - for (int iter = 0; iter < 3; iter++) { - Options options = CurrentOptions(); - options.write_buffer_size = 64*1024; // small write buffer - options.statistics = rocksdb::CreateDBStatistics(); - - switch (iter) { - case 0: - // only uncompressed block cache - options.block_cache = NewLRUCache(8*1024); - options.block_cache_compressed = nullptr; - break; - case 1: - // no block cache, only compressed cache - options.no_block_cache = true; - options.block_cache = nullptr; - options.block_cache_compressed = NewLRUCache(8*1024); - break; - case 2: - // both compressed and uncompressed block cache - options.block_cache = NewLRUCache(1024); - options.block_cache_compressed = NewLRUCache(8*1024); - break; - default: - ASSERT_TRUE(false); - } - Reopen(&options); - - Random rnd(301); - - // Write 8MB (80 values, each 100K) - ASSERT_EQ(NumTableFilesAtLevel(0), 0); - std::vector values; - std::string str; - for (int i = 0; i < num_iter; i++) { - if (i % 4 == 0) { // high compression ratio - str = RandomString(&rnd, 1000); - } - values.push_back(str); - ASSERT_OK(Put(Key(i), values[i])); - } - - // flush all data from memtable so that reads are from block cache - dbfull()->Flush(FlushOptions()); - - for (int i = 0; i < num_iter; i++) { - ASSERT_EQ(Get(Key(i)), values[i]); - } - - // check that we triggered the appropriate code paths in the cache - switch (iter) { - case 0: - // only uncompressed block cache - ASSERT_GT(options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS), - 0); - ASSERT_EQ(options.statistics.get()->getTickerCount - (BLOCK_CACHE_COMPRESSED_MISS), 0); - break; - case 1: - // no block cache, only compressed cache - ASSERT_EQ(options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS), - 0); - ASSERT_GT(options.statistics.get()->getTickerCount - (BLOCK_CACHE_COMPRESSED_MISS), 0); - break; - case 2: - // both compressed and uncompressed block cache - ASSERT_GT(options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS), - 0); - ASSERT_GT(options.statistics.get()->getTickerCount - (BLOCK_CACHE_COMPRESSED_MISS), 0); - break; - default: - ASSERT_TRUE(false); - } - } -} -#endif - TEST(DBTest, CompactionTrigger) { Options options = CurrentOptions(); options.write_buffer_size = 100<<10; //100KB @@ -2185,9 +2085,91 @@ TEST(DBTest, UniversalCompactionOptions) { } } -// TODO(kailiu) disable the in non-linux platforms to temporarily solve -// the unit test failure. -#ifdef OS_LINUX +#if defined(SNAPPY) && defined(ZLIB) && defined(BZIP2) +TEST(DBTest, CompressedCache) { + int num_iter = 80; + + // Run this test three iterations. + // Iteration 1: only a uncompressed block cache + // Iteration 2: only a compressed block cache + // Iteration 3: both block cache and compressed cache + for (int iter = 0; iter < 3; iter++) { + Options options = CurrentOptions(); + options.write_buffer_size = 64*1024; // small write buffer + options.statistics = rocksdb::CreateDBStatistics(); + + switch (iter) { + case 0: + // only uncompressed block cache + options.block_cache = NewLRUCache(8*1024); + options.block_cache_compressed = nullptr; + break; + case 1: + // no block cache, only compressed cache + options.no_block_cache = true; + options.block_cache = nullptr; + options.block_cache_compressed = NewLRUCache(8*1024); + break; + case 2: + // both compressed and uncompressed block cache + options.block_cache = NewLRUCache(1024); + options.block_cache_compressed = NewLRUCache(8*1024); + break; + default: + ASSERT_TRUE(false); + } + Reopen(&options); + + Random rnd(301); + + // Write 8MB (80 values, each 100K) + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + std::vector values; + std::string str; + for (int i = 0; i < num_iter; i++) { + if (i % 4 == 0) { // high compression ratio + str = RandomString(&rnd, 1000); + } + values.push_back(str); + ASSERT_OK(Put(Key(i), values[i])); + } + + // flush all data from memtable so that reads are from block cache + dbfull()->Flush(FlushOptions()); + + for (int i = 0; i < num_iter; i++) { + ASSERT_EQ(Get(Key(i)), values[i]); + } + + // check that we triggered the appropriate code paths in the cache + switch (iter) { + case 0: + // only uncompressed block cache + ASSERT_GT(options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS), + 0); + ASSERT_EQ(options.statistics.get()->getTickerCount + (BLOCK_CACHE_COMPRESSED_MISS), 0); + break; + case 1: + // no block cache, only compressed cache + ASSERT_EQ(options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS), + 0); + ASSERT_GT(options.statistics.get()->getTickerCount + (BLOCK_CACHE_COMPRESSED_MISS), 0); + break; + case 2: + // both compressed and uncompressed block cache + ASSERT_GT(options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS), + 0); + ASSERT_GT(options.statistics.get()->getTickerCount + (BLOCK_CACHE_COMPRESSED_MISS), 0); + break; + default: + ASSERT_TRUE(false); + } + } +} + static std::string CompressibleString(Random* rnd, int len) { std::string r; test::CompressibleString(rnd, 0.8, len, &r); @@ -4535,6 +4517,10 @@ class ModelDB: public DB { return Status::OK(); } + virtual Status GetDbIdentity(std::string& identity) { + return Status::OK(); + } + virtual SequenceNumber GetLatestSequenceNumber() const { return 0; } @@ -4647,7 +4633,7 @@ TEST(DBTest, Randomized) { // TODO(sanjay): Test Get() works int p = rnd.Uniform(100); int minimum = 0; - if (option_config_ == kPrefixHashRep) { + if (option_config_ == kHashSkipList) { minimum = 1; } if (p < 45) { // Put @@ -4817,90 +4803,82 @@ void PrefixScanInit(DBTest *dbtest) { } TEST(DBTest, PrefixScan) { - for (int it = 0; it < 2; ++it) { - ReadOptions ro = ReadOptions(); - int count; - Slice prefix; - Slice key; - char buf[100]; - Iterator* iter; - snprintf(buf, sizeof(buf), "03______:"); - prefix = Slice(buf, 8); - key = Slice(buf, 9); - auto prefix_extractor = NewFixedPrefixTransform(8); - // db configs - env_->count_random_reads_ = true; - Options options = CurrentOptions(); - options.env = env_; - options.no_block_cache = true; - options.filter_policy = NewBloomFilterPolicy(10); - options.prefix_extractor = prefix_extractor; - options.whole_key_filtering = false; - options.disable_auto_compactions = true; - options.max_background_compactions = 2; - options.create_if_missing = true; - options.disable_seek_compaction = true; - if (it == 0) { - options.memtable_factory.reset(NewHashSkipListRepFactory( - prefix_extractor)); - } else { - options.memtable_factory = std::make_shared( - prefix_extractor); - } + ReadOptions ro = ReadOptions(); + int count; + Slice prefix; + Slice key; + char buf[100]; + Iterator* iter; + snprintf(buf, sizeof(buf), "03______:"); + prefix = Slice(buf, 8); + key = Slice(buf, 9); + auto prefix_extractor = NewFixedPrefixTransform(8); + // db configs + env_->count_random_reads_ = true; + Options options = CurrentOptions(); + options.env = env_; + options.no_block_cache = true; + options.filter_policy = NewBloomFilterPolicy(10); + options.prefix_extractor = prefix_extractor; + options.whole_key_filtering = false; + options.disable_auto_compactions = true; + options.max_background_compactions = 2; + options.create_if_missing = true; + options.disable_seek_compaction = true; + options.memtable_factory.reset(NewHashSkipListRepFactory(prefix_extractor)); - // prefix specified, with blooms: 2 RAND I/Os - // SeekToFirst - DestroyAndReopen(&options); - PrefixScanInit(this); - count = 0; - env_->random_read_counter_.Reset(); - ro.prefix = &prefix; - iter = db_->NewIterator(ro); - for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { - assert(iter->key().starts_with(prefix)); - count++; - } - ASSERT_OK(iter->status()); - delete iter; - ASSERT_EQ(count, 2); - ASSERT_EQ(env_->random_read_counter_.Read(), 2); - - // prefix specified, with blooms: 2 RAND I/Os - // Seek - DestroyAndReopen(&options); - PrefixScanInit(this); - count = 0; - env_->random_read_counter_.Reset(); - ro.prefix = &prefix; - iter = db_->NewIterator(ro); - for (iter->Seek(key); iter->Valid(); iter->Next()) { - assert(iter->key().starts_with(prefix)); - count++; - } - ASSERT_OK(iter->status()); - delete iter; - ASSERT_EQ(count, 2); - ASSERT_EQ(env_->random_read_counter_.Read(), 2); - - // no prefix specified: 11 RAND I/Os - DestroyAndReopen(&options); - PrefixScanInit(this); - count = 0; - env_->random_read_counter_.Reset(); - iter = db_->NewIterator(ReadOptions()); - for (iter->Seek(prefix); iter->Valid(); iter->Next()) { - if (! iter->key().starts_with(prefix)) { - break; - } - count++; - } - ASSERT_OK(iter->status()); - delete iter; - ASSERT_EQ(count, 2); - ASSERT_EQ(env_->random_read_counter_.Read(), 11); - Close(); - delete options.filter_policy; + // prefix specified, with blooms: 2 RAND I/Os + // SeekToFirst + DestroyAndReopen(&options); + PrefixScanInit(this); + count = 0; + env_->random_read_counter_.Reset(); + ro.prefix = &prefix; + iter = db_->NewIterator(ro); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + assert(iter->key().starts_with(prefix)); + count++; } + ASSERT_OK(iter->status()); + delete iter; + ASSERT_EQ(count, 2); + ASSERT_EQ(env_->random_read_counter_.Read(), 2); + + // prefix specified, with blooms: 2 RAND I/Os + // Seek + DestroyAndReopen(&options); + PrefixScanInit(this); + count = 0; + env_->random_read_counter_.Reset(); + ro.prefix = &prefix; + iter = db_->NewIterator(ro); + for (iter->Seek(key); iter->Valid(); iter->Next()) { + assert(iter->key().starts_with(prefix)); + count++; + } + ASSERT_OK(iter->status()); + delete iter; + ASSERT_EQ(count, 2); + ASSERT_EQ(env_->random_read_counter_.Read(), 2); + + // no prefix specified: 11 RAND I/Os + DestroyAndReopen(&options); + PrefixScanInit(this); + count = 0; + env_->random_read_counter_.Reset(); + iter = db_->NewIterator(ReadOptions()); + for (iter->Seek(prefix); iter->Valid(); iter->Next()) { + if (! iter->key().starts_with(prefix)) { + break; + } + count++; + } + ASSERT_OK(iter->status()); + delete iter; + ASSERT_EQ(count, 2); + ASSERT_EQ(env_->random_read_counter_.Read(), 11); + Close(); + delete options.filter_policy; } std::string MakeKey(unsigned int num) { diff --git a/db/memtable.cc b/db/memtable.cc index b4df915e0b..f396bc0826 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -36,7 +36,7 @@ struct hash { namespace rocksdb { MemTable::MemTable(const InternalKeyComparator& cmp, - std::shared_ptr table_factory, + MemTableRepFactory* table_factory, int numlevel, const Options& options) : comparator_(cmp), @@ -281,7 +281,7 @@ bool MemTable::Update(SequenceNumber seq, ValueType type, Slice memkey = lkey.memtable_key(); std::shared_ptr iter( - table_.get()->GetIterator(lkey.user_key())); + table_->GetIterator(lkey.user_key())); iter->Seek(key, memkey.data()); if (iter->Valid()) { diff --git a/db/memtable.h b/db/memtable.h index 8a3a8610cc..7edb5681da 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -36,7 +36,7 @@ class MemTable { // is zero and the caller must call Ref() at least once. explicit MemTable( const InternalKeyComparator& comparator, - std::shared_ptr table_factory, + MemTableRepFactory* table_factory, int numlevel = 7, const Options& options = Options()); diff --git a/db/memtablelist.cc b/db/memtablelist.cc index 48725590bf..71e4e5a923 100644 --- a/db/memtablelist.cc +++ b/db/memtablelist.cc @@ -80,7 +80,8 @@ Status MemTableList::InstallMemtableFlushResults( VersionSet* vset, Status flushStatus, port::Mutex* mu, Logger* info_log, uint64_t file_number, - std::set& pending_outputs) { + std::set& pending_outputs, + std::vector* to_delete) { mu->AssertHeld(); // If the flush was not successful, then just reset state. @@ -151,7 +152,9 @@ Status MemTableList::InstallMemtableFlushResults( // executing compaction threads do not mistakenly assume that this // file is not live. pending_outputs.erase(m->file_number_); - m->Unref(); + if (m->Unref() != nullptr) { + to_delete->push_back(m); + } size_--; } else { //commit failed. setup state so that we can flush again. diff --git a/db/memtablelist.h b/db/memtablelist.h index 5f36752f4c..ed353c8b87 100644 --- a/db/memtablelist.h +++ b/db/memtablelist.h @@ -65,7 +65,8 @@ class MemTableList { VersionSet* vset, Status flushStatus, port::Mutex* mu, Logger* info_log, uint64_t file_number, - std::set& pending_outputs); + std::set& pending_outputs, + std::vector* to_delete); // New memtables are inserted at the front of the list. // Takes ownership of the referenced held on *m by the caller of Add(). diff --git a/db/perf_context_test.cc b/db/perf_context_test.cc index 2a6e6b7e48..472cc719a2 100644 --- a/db/perf_context_test.cc +++ b/db/perf_context_test.cc @@ -38,8 +38,8 @@ std::shared_ptr OpenDb() { if (FLAGS_use_set_based_memetable) { auto prefix_extractor = rocksdb::NewFixedPrefixTransform(0); - options.memtable_factory = - std::make_shared(prefix_extractor); + options.memtable_factory.reset( + NewHashSkipListRepFactory(prefix_extractor)); } Status s = DB::Open(options, kDbName, &db); diff --git a/db/prefix_test.cc b/db/prefix_test.cc index 4b15e63e3d..d76285381f 100644 --- a/db/prefix_test.cc +++ b/db/prefix_test.cc @@ -11,7 +11,6 @@ #include "util/testharness.h" DEFINE_bool(use_prefix_hash_memtable, true, ""); -DEFINE_bool(use_nolock_version, true, ""); DEFINE_bool(trigger_deadlock, false, "issue delete in range scan to trigger PrefixHashMap deadlock"); DEFINE_uint64(bucket_count, 100000, "number of buckets"); diff --git a/db/repair.cc b/db/repair.cc index 66aa95ae26..fc9ba282d6 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -196,7 +196,7 @@ class Repairer { std::string scratch; Slice record; WriteBatch batch; - MemTable* mem = new MemTable(icmp_, options_.memtable_factory, + MemTable* mem = new MemTable(icmp_, options_.memtable_factory.get(), options_.num_levels); mem->Ref(); int counter = 0; @@ -227,7 +227,7 @@ class Repairer { table_cache_, iter, &meta, icmp_.user_comparator(), 0, 0, true); delete iter; - mem->Unref(); + delete mem->Unref(); mem = nullptr; if (status.ok()) { if (meta.file_size > 0) { diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 4aba21ca42..ff9aa63eec 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -22,7 +22,7 @@ namespace rocksdb { static std::string PrintContents(WriteBatch* b) { InternalKeyComparator cmp(BytewiseComparator()); auto factory = std::make_shared(); - MemTable* mem = new MemTable(cmp, factory); + MemTable* mem = new MemTable(cmp, factory.get()); mem->Ref(); std::string state; Options options; @@ -69,7 +69,7 @@ static std::string PrintContents(WriteBatch* b) { } else if (count != WriteBatchInternal::Count(b)) { state.append("CountMismatch()"); } - mem->Unref(); + delete mem->Unref(); return state; } diff --git a/include/rocksdb/compaction_filter.h b/include/rocksdb/compaction_filter.h index cb55ac44f3..f24132a6fe 100644 --- a/include/rocksdb/compaction_filter.h +++ b/include/rocksdb/compaction_filter.h @@ -40,6 +40,16 @@ class CompactionFilter { // When the value is to be preserved, the application has the option // to modify the existing_value and pass it back through new_value. // value_changed needs to be set to true in this case. + // + // If multithreaded compaction is being used *and* a single CompactionFilter + // instance was supplied via Options::compaction_filter, this method may be + // called from different threads concurrently. The application must ensure + // that the call is thread-safe. + // + // If the CompactionFilter was created by a factory, then it will only ever + // be used by a single thread that is doing the compaction run, and this + // call does not need to be thread-safe. However, multiple filters may be + // in existence and operating concurrently. virtual bool Filter(int level, const Slice& key, const Slice& existing_value, diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 73f9ac4da3..7396f84454 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -273,7 +273,7 @@ class DB { // Sets iter to an iterator that is positioned at a write-batch containing // seq_number. If the sequence number is non existent, it returns an iterator // at the first available seq_no after the requested seq_no - // Returns Status::Ok if iterator is valid + // Returns Status::OK if iterator is valid // Must set WAL_ttl_seconds or WAL_size_limit_MB to large values to // use this api, else the WAL files will get // cleared aggressively and the iterator might keep getting invalid before @@ -292,6 +292,11 @@ class DB { std::vector *metadata) { } + // Sets the globally unique ID created at database creation time by invoking + // Env::GenerateUniqueId(), in identity. Returns Status::OK if identity could + // be set properly + virtual Status GetDbIdentity(std::string& identity) = 0; + private: // No copying allowed DB(const DB&); diff --git a/include/rocksdb/memtablerep.h b/include/rocksdb/memtablerep.h index 53a7f50649..30ea3cf41e 100644 --- a/include/rocksdb/memtablerep.h +++ b/include/rocksdb/memtablerep.h @@ -17,21 +17,13 @@ // The factory will be passed an Arena object when a new MemTableRep is // requested. The API for this object is in rocksdb/arena.h. // -// Users can implement their own memtable representations. We include four +// Users can implement their own memtable representations. We include three // types built in: // - SkipListRep: This is the default; it is backed by a skip list. -// - TransformRep: This is backed by an custom hash map. -// On construction, they are given a SliceTransform object. This -// object is applied to the user key of stored items which indexes into the -// hash map to yield a skiplist containing all records that share the same -// user key under the transform function. -// - UnsortedRep: A subclass of TransformRep where the transform function is -// the identity function. Optimized for point lookups. -// - PrefixHashRep: A subclass of TransformRep where the transform function is -// a fixed-size prefix extractor. If you use PrefixHashRepFactory, the transform -// must be identical to options.prefix_extractor, otherwise it will be discarded -// and the default will be used. It is optimized for ranged scans over a -// prefix. +// - HashSkipListRep: The memtable rep that is best used for keys that are +// structured like "prefix:suffix" where iteration withing a prefix is +// common and iteration across different prefixes is rare. It is backed by +// a hash map where each bucket is a skip list. // - VectorRep: This is backed by an unordered std::vector. On iteration, the // vector is sorted. It is intelligent about sorting; once the MarkReadOnly() // has been called, the vector will only be sorted once. It is optimized for @@ -186,16 +178,14 @@ public: } }; -// TransformReps are backed by an unordered map of buffers to buckets. When -// looking up a key, the user key is extracted and a user-supplied transform -// function (see rocksdb/slice_transform.h) is applied to get the key into the -// unordered map. This allows the user to bin user keys based on arbitrary -// criteria. Two example implementations are UnsortedRepFactory and -// PrefixHashRepFactory. +// HashSkipListRep is backed by hash map of buckets. Each bucket is a skip +// list. All the keys with the same prefix will be in the same bucket. +// The prefix is determined using user supplied SliceTransform. It has +// to match prefix_extractor in options.prefix_extractor. // // Iteration over the entire collection is implemented by dumping all the keys -// into an std::set. Thus, these data structures are best used when iteration -// over the entire collection is rare. +// into a separate skip list. Thus, these data structures are best used when +// iteration over the entire collection is rare. // // Parameters: // transform: The SliceTransform to bucket user keys on. TransformRepFactory diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index d5f671ebec..2befb05b97 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -15,11 +15,9 @@ #include #include -#include "rocksdb/memtablerep.h" #include "rocksdb/memtablerep.h" #include "rocksdb/slice.h" #include "rocksdb/slice_transform.h" -#include "rocksdb/slice_transform.h" #include "rocksdb/statistics.h" #include "rocksdb/table_properties.h" #include "rocksdb/universal_compaction.h" @@ -95,16 +93,33 @@ struct Options { // Default: nullptr shared_ptr merge_operator; - // The client must provide compaction_filter_factory if it requires a new - // compaction filter to be used for different compaction processes + // A single CompactionFilter instance to call into during compaction. // Allows an application to modify/delete a key-value during background // compaction. - // Ideally, client should specify only one of filter or factory. + // + // If the client requires a new compaction filter to be used for different + // compaction runs, it can specify compaction_filter_factory instead of this + // option. The client should specify only one of the two. // compaction_filter takes precedence over compaction_filter_factory if // client specifies both. + // + // If multithreaded compaction is being used, the supplied CompactionFilter + // instance may be used from different threads concurrently and so should be + // thread-safe. + // // Default: nullptr const CompactionFilter* compaction_filter; + // This is a factory that provides compaction filter objects which allow + // an application to modify/delete a key-value during background compaction. + // + // A new filter will be created on each compaction run. If multithreaded + // compaction is being used, each created CompactionFilter will only be used + // from a single thread and so does not need to be thread-safe. + // + // Default: a factory that doesn't provide any object + std::shared_ptr compaction_filter_factory; + // If true, the database will be created if it is missing. // Default: false bool create_if_missing; @@ -602,11 +617,6 @@ struct Options { // Table and TableBuilder. std::shared_ptr table_factory; - // This is a factory that provides compaction filter objects which allow - // an application to modify/delete a key-value during background compaction. - // Default: a factory that doesn't provide any object - std::shared_ptr compaction_filter_factory; - // This option allows user to to collect their own interested statistics of // the tables. // Default: emtpy vector -- no user-defined statistics collection will be diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index 102a4be584..286a624c8e 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -51,6 +51,11 @@ enum Tickers { // # of times bloom filter has avoided file reads. BLOOM_FILTER_USEFUL, + // # of memtable hits. + MEMTABLE_HIT, + // # of memtable misses. + MEMTABLE_MISS, + /** * COMPACTION_KEY_DROP_* count the reasons for key drop during compaction * There are 3 reasons currently. @@ -125,6 +130,8 @@ const std::vector> TickersNameMap = { { BLOCK_CACHE_DATA_MISS, "rocksdb.block.cache.data.miss" }, { BLOCK_CACHE_DATA_HIT, "rocksdb.block.cache.data.hit" }, { BLOOM_FILTER_USEFUL, "rocksdb.bloom.filter.useful" }, + { MEMTABLE_HIT, "rocksdb.memtable.hit" }, + { MEMTABLE_MISS, "rocksdb.memtable.miss" }, { COMPACTION_KEY_DROP_NEWER_ENTRY, "rocksdb.compaction.key.drop.new" }, { COMPACTION_KEY_DROP_OBSOLETE, "rocksdb.compaction.key.drop.obsolete" }, { COMPACTION_KEY_DROP_USER, "rocksdb.compaction.key.drop.user" }, diff --git a/include/utilities/stackable_db.h b/include/utilities/stackable_db.h index dc26ed852e..e74bf353b4 100644 --- a/include/utilities/stackable_db.h +++ b/include/utilities/stackable_db.h @@ -140,6 +140,10 @@ class StackableDB : public DB { return db_->DeleteFile(name); } + virtual Status GetDbIdentity(std::string& identity) { + return db_->GetDbIdentity(identity); + } + virtual Status GetUpdatesSince(SequenceNumber seq_number, unique_ptr* iter) override { diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index a6dbe35196..61ac193c99 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -100,9 +100,10 @@ BlockBasedTableBuilder::BlockBasedTableBuilder( rep_->filter_block->StartBlock(0); } if (options.block_cache_compressed.get() != nullptr) { - BlockBasedTable::GenerateCachePrefix(options.block_cache_compressed, file, - &rep_->compressed_cache_key_prefix[0], - &rep_->compressed_cache_key_prefix_size); + BlockBasedTable::GenerateCachePrefix( + options.block_cache_compressed.get(), file, + &rep_->compressed_cache_key_prefix[0], + &rep_->compressed_cache_key_prefix_size); } } diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index e69bef6798..080daa5a7c 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -97,18 +97,18 @@ void BlockBasedTable::SetupCacheKeyPrefix(Rep* rep) { rep->cache_key_prefix_size = 0; rep->compressed_cache_key_prefix_size = 0; if (rep->options.block_cache != nullptr) { - GenerateCachePrefix(rep->options.block_cache, rep->file.get(), + GenerateCachePrefix(rep->options.block_cache.get(), rep->file.get(), &rep->cache_key_prefix[0], &rep->cache_key_prefix_size); } if (rep->options.block_cache_compressed != nullptr) { - GenerateCachePrefix(rep->options.block_cache_compressed, rep->file.get(), - &rep->compressed_cache_key_prefix[0], + GenerateCachePrefix(rep->options.block_cache_compressed.get(), + rep->file.get(), &rep->compressed_cache_key_prefix[0], &rep->compressed_cache_key_prefix_size); } } -void BlockBasedTable::GenerateCachePrefix(shared_ptr cc, +void BlockBasedTable::GenerateCachePrefix(Cache* cc, RandomAccessFile* file, char* buffer, size_t* size) { // generate an id from the file @@ -122,7 +122,7 @@ void BlockBasedTable::GenerateCachePrefix(shared_ptr cc, } } -void BlockBasedTable::GenerateCachePrefix(shared_ptr cc, +void BlockBasedTable::GenerateCachePrefix(Cache* cc, WritableFile* file, char* buffer, size_t* size) { // generate an id from the file diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index 72fb35fa51..05811b5d34 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -167,9 +167,9 @@ class BlockBasedTable : public TableReader { rep_ = rep; } // Generate a cache key prefix from the file - static void GenerateCachePrefix(shared_ptr cc, + static void GenerateCachePrefix(Cache* cc, RandomAccessFile* file, char* buffer, size_t* size); - static void GenerateCachePrefix(shared_ptr cc, + static void GenerateCachePrefix(Cache* cc, WritableFile* file, char* buffer, size_t* size); // The longest prefix of the cache key used to identify blocks. diff --git a/table/table_test.cc b/table/table_test.cc index 394aa4b9d5..1f79fcdf9a 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -370,15 +370,15 @@ class MemTableConstructor: public Constructor { : Constructor(cmp), internal_comparator_(cmp), table_factory_(new SkipListFactory) { - memtable_ = new MemTable(internal_comparator_, table_factory_); + memtable_ = new MemTable(internal_comparator_, table_factory_.get()); memtable_->Ref(); } ~MemTableConstructor() { - memtable_->Unref(); + delete memtable_->Unref(); } virtual Status FinishImpl(const Options& options, const KVMap& data) { - memtable_->Unref(); - memtable_ = new MemTable(internal_comparator_, table_factory_); + delete memtable_->Unref(); + memtable_ = new MemTable(internal_comparator_, table_factory_.get()); memtable_->Ref(); int seq = 1; for (KVMap::const_iterator it = data.begin(); @@ -930,19 +930,19 @@ TEST(TableTest, NumBlockStat) { class BlockCacheProperties { public: - explicit BlockCacheProperties(std::shared_ptr statistics) { + explicit BlockCacheProperties(Statistics* statistics) { block_cache_miss = - statistics.get()->getTickerCount(BLOCK_CACHE_MISS); + statistics->getTickerCount(BLOCK_CACHE_MISS); block_cache_hit = - statistics.get()->getTickerCount(BLOCK_CACHE_HIT); + statistics->getTickerCount(BLOCK_CACHE_HIT); index_block_cache_miss = - statistics.get()->getTickerCount(BLOCK_CACHE_INDEX_MISS); + statistics->getTickerCount(BLOCK_CACHE_INDEX_MISS); index_block_cache_hit = - statistics.get()->getTickerCount(BLOCK_CACHE_INDEX_HIT); + statistics->getTickerCount(BLOCK_CACHE_INDEX_HIT); data_block_cache_miss = - statistics.get()->getTickerCount(BLOCK_CACHE_DATA_MISS); + statistics->getTickerCount(BLOCK_CACHE_DATA_MISS); data_block_cache_hit = - statistics.get()->getTickerCount(BLOCK_CACHE_DATA_HIT); + statistics->getTickerCount(BLOCK_CACHE_DATA_HIT); } // Check if the fetched props matches the expected ones. @@ -993,7 +993,7 @@ TEST(TableTest, BlockCacheTest) { // At first, no block will be accessed. { - BlockCacheProperties props(options.statistics); + BlockCacheProperties props(options.statistics.get()); // index will be added to block cache. props.AssertEqual( 1, // index block miss @@ -1006,7 +1006,7 @@ TEST(TableTest, BlockCacheTest) { // Only index block will be accessed { iter.reset(c.NewIterator()); - BlockCacheProperties props(options.statistics); + BlockCacheProperties props(options.statistics.get()); // NOTE: to help better highlight the "detla" of each ticker, I use // + to indicate the increment of changed // value; other numbers remain the same. @@ -1021,7 +1021,7 @@ TEST(TableTest, BlockCacheTest) { // Only data block will be accessed { iter->SeekToFirst(); - BlockCacheProperties props(options.statistics); + BlockCacheProperties props(options.statistics.get()); props.AssertEqual( 1, 1, @@ -1034,7 +1034,7 @@ TEST(TableTest, BlockCacheTest) { { iter.reset(c.NewIterator()); iter->SeekToFirst(); - BlockCacheProperties props(options.statistics); + BlockCacheProperties props(options.statistics.get()); props.AssertEqual( 1, 1 + 1, // index block hit @@ -1047,14 +1047,14 @@ TEST(TableTest, BlockCacheTest) { // -- PART 2: Open without block cache options.block_cache.reset(); - options.statistics = CreateDBStatistics(); // reset the props + options.statistics = CreateDBStatistics(); // reset the stats c.Reopen(options); { iter.reset(c.NewIterator()); iter->SeekToFirst(); ASSERT_EQ("key", iter->key().ToString()); - BlockCacheProperties props(options.statistics); + BlockCacheProperties props(options.statistics.get()); // Nothing is affected at all props.AssertEqual(0, 0, 0, 0); } @@ -1065,7 +1065,7 @@ TEST(TableTest, BlockCacheTest) { options.block_cache = NewLRUCache(1); c.Reopen(options); { - BlockCacheProperties props(options.statistics); + BlockCacheProperties props(options.statistics.get()); props.AssertEqual( 1, // index block miss 0, @@ -1080,7 +1080,7 @@ TEST(TableTest, BlockCacheTest) { // It first cache index block then data block. But since the cache size // is only 1, index block will be purged after data block is inserted. iter.reset(c.NewIterator()); - BlockCacheProperties props(options.statistics); + BlockCacheProperties props(options.statistics.get()); props.AssertEqual( 1 + 1, // index block miss 0, @@ -1093,7 +1093,7 @@ TEST(TableTest, BlockCacheTest) { // SeekToFirst() accesses data block. With similar reason, we expect data // block's cache miss. iter->SeekToFirst(); - BlockCacheProperties props(options.statistics); + BlockCacheProperties props(options.statistics.get()); props.AssertEqual( 2, 0, @@ -1268,7 +1268,7 @@ class MemTableTest { }; TEST(MemTableTest, Simple) { InternalKeyComparator cmp(BytewiseComparator()); auto table_factory = std::make_shared(); - MemTable* memtable = new MemTable(cmp, table_factory); + MemTable* memtable = new MemTable(cmp, table_factory.get()); memtable->Ref(); WriteBatch batch; Options options; @@ -1289,7 +1289,7 @@ TEST(MemTableTest, Simple) { } delete iter; - memtable->Unref(); + delete memtable->Unref(); } diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 71e36e901c..966f007e83 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -305,8 +305,7 @@ DEFINE_bool(filter_deletes, false, "On true, deletes use KeyMayExist to drop" enum RepFactory { kSkipList, - kPrefixHash, - kUnsorted, + kHashSkipList, kVectorRep }; enum RepFactory StringToRepFactory(const char* ctype) { @@ -315,9 +314,7 @@ enum RepFactory StringToRepFactory(const char* ctype) { if (!strcasecmp(ctype, "skip_list")) return kSkipList; else if (!strcasecmp(ctype, "prefix_hash")) - return kPrefixHash; - else if (!strcasecmp(ctype, "unsorted")) - return kUnsorted; + return kHashSkipList; else if (!strcasecmp(ctype, "vector")) return kVectorRep; @@ -335,7 +332,7 @@ static bool ValidatePrefixSize(const char* flagname, int32_t value) { } return true; } -DEFINE_int32(prefix_size, 0, "Control the prefix size for PrefixHashRep"); +DEFINE_int32(prefix_size, 0, "Control the prefix size for HashSkipListRep"); static const bool FLAGS_prefix_size_dummy = google::RegisterFlagValidator(&FLAGS_prefix_size, &ValidatePrefixSize); @@ -1338,12 +1335,9 @@ class StressTest { case kSkipList: memtablerep = "skip_list"; break; - case kPrefixHash: + case kHashSkipList: memtablerep = "prefix_hash"; break; - case kUnsorted: - memtablerep = "unsorted"; - break; case kVectorRep: memtablerep = "vector"; break; @@ -1393,21 +1387,15 @@ class StressTest { FLAGS_delete_obsolete_files_period_micros; options.max_manifest_file_size = 1024; options.filter_deletes = FLAGS_filter_deletes; - if ((FLAGS_prefix_size == 0) == (FLAGS_rep_factory == kPrefixHash)) { + if ((FLAGS_prefix_size == 0) == (FLAGS_rep_factory == kHashSkipList)) { fprintf(stderr, "prefix_size should be non-zero iff memtablerep == prefix_hash\n"); exit(1); } switch (FLAGS_rep_factory) { - case kPrefixHash: - options.memtable_factory.reset( - new PrefixHashRepFactory(NewFixedPrefixTransform(FLAGS_prefix_size)) - ); - break; - case kUnsorted: - options.memtable_factory.reset( - new UnsortedRepFactory() - ); + case kHashSkipList: + options.memtable_factory.reset(NewHashSkipListRepFactory( + NewFixedPrefixTransform(FLAGS_prefix_size))); break; case kSkipList: // no need to do anything diff --git a/util/hash_skiplist_rep.cc b/util/hash_skiplist_rep.cc index bcc459f662..3ca6835bd5 100644 --- a/util/hash_skiplist_rep.cc +++ b/util/hash_skiplist_rep.cc @@ -4,6 +4,8 @@ // of patent rights can be found in the PATENTS file in the same directory. // +#include "util/hash_skiplist_rep.h" + #include "rocksdb/memtablerep.h" #include "rocksdb/arena.h" #include "rocksdb/slice.h" @@ -309,39 +311,12 @@ std::shared_ptr } // anon namespace -class HashSkipListRepFactory : public MemTableRepFactory { - public: - explicit HashSkipListRepFactory( - const SliceTransform* transform, - size_t bucket_count, - int32_t skiplist_height, - int32_t skiplist_branching_factor) - : transform_(transform), - bucket_count_(bucket_count), - skiplist_height_(skiplist_height), - skiplist_branching_factor_(skiplist_branching_factor) { } - - virtual ~HashSkipListRepFactory() { delete transform_; } - - virtual std::shared_ptr CreateMemTableRep( - MemTableRep::KeyComparator& compare, Arena* arena) override { - return std::make_shared(compare, arena, transform_, - bucket_count_, skiplist_height_, - skiplist_branching_factor_); - } - - virtual const char* Name() const override { - return "HashSkipListRepFactory"; - } - - const SliceTransform* GetTransform() { return transform_; } - - private: - const SliceTransform* transform_; - const size_t bucket_count_; - const int32_t skiplist_height_; - const int32_t skiplist_branching_factor_; -}; +std::shared_ptr HashSkipListRepFactory::CreateMemTableRep( + MemTableRep::KeyComparator& compare, Arena* arena) { + return std::make_shared(compare, arena, transform_, + bucket_count_, skiplist_height_, + skiplist_branching_factor_); +} MemTableRepFactory* NewHashSkipListRepFactory( const SliceTransform* transform, size_t bucket_count, diff --git a/util/hash_skiplist_rep.h b/util/hash_skiplist_rep.h new file mode 100644 index 0000000000..3411f66b15 --- /dev/null +++ b/util/hash_skiplist_rep.h @@ -0,0 +1,45 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once +#include "rocksdb/slice_transform.h" +#include "rocksdb/memtablerep.h" + +namespace rocksdb { + +class HashSkipListRepFactory : public MemTableRepFactory { + public: + explicit HashSkipListRepFactory( + const SliceTransform* transform, + size_t bucket_count, + int32_t skiplist_height, + int32_t skiplist_branching_factor) + : transform_(transform), + bucket_count_(bucket_count), + skiplist_height_(skiplist_height), + skiplist_branching_factor_(skiplist_branching_factor) { } + + virtual ~HashSkipListRepFactory() { delete transform_; } + + virtual std::shared_ptr CreateMemTableRep( + MemTableRep::KeyComparator& compare, Arena* arena) override; + + virtual const char* Name() const override { + return "HashSkipListRepFactory"; + } + + const SliceTransform* GetTransform() { return transform_; } + + private: + const SliceTransform* transform_; + const size_t bucket_count_; + const int32_t skiplist_height_; + const int32_t skiplist_branching_factor_; +}; + +} diff --git a/util/ldb_cmd.cc b/util/ldb_cmd.cc index e255f81966..58d81460e9 100644 --- a/util/ldb_cmd.cc +++ b/util/ldb_cmd.cc @@ -1226,25 +1226,41 @@ void ChangeCompactionStyleCommand::DoCommand() { class InMemoryHandler : public WriteBatch::Handler { public: + InMemoryHandler(stringstream& row, bool print_values) : Handler(),row_(row) { + print_values_ = print_values; + } + + void commonPutMerge(const Slice& key, const Slice& value) { + string k = LDBCommand::StringToHex(key.ToString()); + if (print_values_) { + string v = LDBCommand::StringToHex(value.ToString()); + row_ << k << " : "; + row_ << v << " "; + } else { + row_ << k << " "; + } + } virtual void Put(const Slice& key, const Slice& value) { - putMap_[key.ToString()] = value.ToString(); + row_ << "PUT : "; + commonPutMerge(key, value); } + + virtual void Merge(const Slice& key, const Slice& value) { + row_ << "MERGE : "; + commonPutMerge(key, value); + } + virtual void Delete(const Slice& key) { - deleteList_.push_back(key.ToString(true)); + row_ <<",DELETE : "; + row_ << LDBCommand::StringToHex(key.ToString()) << " "; } + virtual ~InMemoryHandler() { }; - map PutMap() { - return putMap_; - } - vector DeleteList() { - return deleteList_; - } - private: - map putMap_; - vector deleteList_; + stringstream & row_; + bool print_values_; }; const string WALDumperCommand::ARG_WAL_FILE = "walfile"; @@ -1322,26 +1338,8 @@ void WALDumperCommand::DoCommand() { row<( + new DefaultCompactionFilterFactory())), create_if_missing(false), error_if_exists(false), paranoid_checks(false), @@ -97,9 +100,6 @@ Options::Options() memtable_factory(std::shared_ptr(new SkipListFactory)), table_factory( std::shared_ptr(new BlockBasedTableFactory())), - compaction_filter_factory( - std::shared_ptr( - new DefaultCompactionFilterFactory())), inplace_update_support(false), inplace_update_num_locks(10000) { assert(memtable_factory.get() != nullptr); @@ -278,6 +278,9 @@ Options::Dump(Logger* log) const Log(log,"Options.compaction_options_universal." "max_size_amplification_percent: %u", compaction_options_universal.max_size_amplification_percent); + Log(log, + "Options.compaction_options_universal.compression_size_percent: %u", + compaction_options_universal.compression_size_percent); std::string collector_names; for (auto collector : table_properties_collectors) { collector_names.append(collector->Name()); diff --git a/util/stl_wrappers.h b/util/stl_wrappers.h index b42a584277..b4c14b4ba3 100644 --- a/util/stl_wrappers.h +++ b/util/stl_wrappers.h @@ -28,24 +28,5 @@ namespace stl_wrappers { } }; - struct Hash { - inline size_t operator()(const char* buf) const { - Slice internal_key = GetLengthPrefixedSlice(buf); - Slice value = - GetLengthPrefixedSlice(internal_key.data() + internal_key.size()); - unsigned int hval = MurmurHash(internal_key.data(), internal_key.size(), - 0); - hval = MurmurHash(value.data(), value.size(), hval); - return hval; - } - }; - - struct KeyEqual : private Base { - explicit KeyEqual(const MemTableRep::KeyComparator& compare) - : Base(compare) { } - inline bool operator()(const char* a, const char* b) const { - return this->compare_(a, b) == 0; - } - }; } } diff --git a/util/transformrep.cc b/util/transformrep.cc deleted file mode 100644 index ef12055702..0000000000 --- a/util/transformrep.cc +++ /dev/null @@ -1,426 +0,0 @@ -// Copyright (c) 2013, Facebook, Inc. All rights reserved. -// This source code is licensed under the BSD-style license found in the -// LICENSE file in the root directory of this source tree. An additional grant -// of patent rights can be found in the PATENTS file in the same directory. -// -#include -#include -#include -#include -#include - -#include "rocksdb/memtablerep.h" -#include "rocksdb/arena.h" -#include "rocksdb/slice.h" -#include "rocksdb/slice_transform.h" -#include "db/memtable.h" -#include "port/port.h" -#include "util/mutexlock.h" -#include "util/murmurhash.h" -#include "util/stl_wrappers.h" - -namespace std { -template <> -struct hash { - size_t operator()(const rocksdb::Slice& slice) const { - return MurmurHash(slice.data(), slice.size(), 0); - } -}; -} - -namespace rocksdb { -namespace { - -using namespace stl_wrappers; - -class TransformRep : public MemTableRep { - public: - TransformRep(const KeyComparator& compare, Arena* arena, - const SliceTransform* transform, size_t bucket_size, - size_t num_locks); - - virtual void Insert(const char* key) override; - - virtual bool Contains(const char* key) const override; - - virtual size_t ApproximateMemoryUsage() override; - - virtual ~TransformRep() { } - - virtual std::shared_ptr GetIterator() override; - - virtual std::shared_ptr GetIterator( - const Slice& slice) override; - - virtual std::shared_ptr GetDynamicPrefixIterator() - override { - return std::make_shared(*this); - } - - std::shared_ptr GetTransformIterator( - const Slice& transformed); - - private: - friend class DynamicPrefixIterator; - typedef std::set Bucket; - typedef std::unordered_map> BucketMap; - - // Maps slices (which are transformed user keys) to buckets of keys sharing - // the same transform. - BucketMap buckets_; - - // rwlock_ protects access to the buckets_ data structure itself. Each bucket - // has its own read-write lock as well. - mutable port::RWMutex rwlock_; - - // Keep track of approximately how much memory is being used. - size_t memory_usage_ = 0; - - // The user-supplied transform whose domain is the user keys. - const SliceTransform* transform_; - - // Get a bucket from buckets_. If the bucket hasn't been initialized yet, - // initialize it before returning. Must be externally synchronized. - std::shared_ptr& GetBucket(const Slice& transformed); - - port::RWMutex* GetLock(const Slice& transformed) const; - - mutable std::vector locks_; - - const KeyComparator& compare_; - - class Iterator : public MemTableRep::Iterator { - public: - explicit Iterator(std::shared_ptr items); - - virtual ~Iterator() { }; - - // Returns true iff the iterator is positioned at a valid node. - virtual bool Valid() const; - - // Returns the key at the current position. - // REQUIRES: Valid() - virtual const char* key() const; - - // Advances to the next position. - // REQUIRES: Valid() - virtual void Next(); - - // Advances to the previous position. - // REQUIRES: Valid() - virtual void Prev(); - - // Advance to the first entry with a key >= target - virtual void Seek(const Slice& user_key, const char* memtable_key); - - // Position at the first entry in collection. - // Final state of iterator is Valid() iff collection is not empty. - virtual void SeekToFirst(); - - // Position at the last entry in collection. - // Final state of iterator is Valid() iff collection is not empty. - virtual void SeekToLast(); - private: - std::shared_ptr items_; - Bucket::const_iterator cit_; - std::string tmp_; // For passing to EncodeKey - }; - - class EmptyIterator : public MemTableRep::Iterator { - // This is used when there wasn't a bucket. It is cheaper than - // instantiating an empty bucket over which to iterate. - public: - virtual bool Valid() const { - return false; - } - virtual const char* key() const { - assert(false); - return nullptr; - } - virtual void Next() { } - virtual void Prev() { } - virtual void Seek(const Slice& user_key, const char* memtable_key) { } - virtual void SeekToFirst() { } - virtual void SeekToLast() { } - static std::shared_ptr GetInstance(); - private: - static std::shared_ptr instance; - EmptyIterator() { } - }; - - class TransformIterator : public Iterator { - public: - explicit TransformIterator(std::shared_ptr items, - port::RWMutex* rwlock); - virtual ~TransformIterator() { } - private: - const ReadLock l_; - }; - - - class DynamicPrefixIterator : public MemTableRep::Iterator { - private: - // the underlying memtable rep - const TransformRep& memtable_rep_; - // the result of a prefix seek - std::unique_ptr bucket_iterator_; - - public: - explicit DynamicPrefixIterator(const TransformRep& memtable_rep) - : memtable_rep_(memtable_rep) {} - - virtual ~DynamicPrefixIterator() { }; - - // Returns true iff the iterator is positioned at a valid node. - virtual bool Valid() const { - return bucket_iterator_ && bucket_iterator_->Valid(); - } - - // Returns the key at the current position. - // REQUIRES: Valid() - virtual const char* key() const { - assert(Valid()); - return bucket_iterator_->key(); - } - - // Advances to the next position. - // REQUIRES: Valid() - virtual void Next() { - assert(Valid()); - bucket_iterator_->Next(); - } - - // Advances to the previous position. - // REQUIRES: Valid() - virtual void Prev() { - assert(Valid()); - bucket_iterator_->Prev(); - } - - // Advance to the first entry with a key >= target within the - // same bucket as target - virtual void Seek(const Slice& user_key, const char* memtable_key) { - Slice prefix = memtable_rep_.transform_->Transform(user_key); - - ReadLock l(&memtable_rep_.rwlock_); - auto bucket = memtable_rep_.buckets_.find(prefix); - if (bucket == memtable_rep_.buckets_.end()) { - bucket_iterator_.reset(nullptr); - } else { - bucket_iterator_.reset( - new TransformIterator(bucket->second, memtable_rep_.GetLock(prefix))); - bucket_iterator_->Seek(user_key, memtable_key); - } - } - - // Position at the first entry in collection. - // Final state of iterator is Valid() iff collection is not empty. - virtual void SeekToFirst() { - // Prefix iterator does not support total order. - // We simply set the iterator to invalid state - bucket_iterator_.reset(nullptr); - } - - // Position at the last entry in collection. - // Final state of iterator is Valid() iff collection is not empty. - virtual void SeekToLast() { - // Prefix iterator does not support total order. - // We simply set the iterator to invalid state - bucket_iterator_.reset(nullptr); - } - }; -}; - -class PrefixHashRep : public TransformRep { - public: - PrefixHashRep(const KeyComparator& compare, Arena* arena, - const SliceTransform* transform, size_t bucket_size, - size_t num_locks) - : TransformRep(compare, arena, transform, - bucket_size, num_locks) { } - - virtual std::shared_ptr GetPrefixIterator( - const Slice& prefix) override; -}; - -std::shared_ptr& TransformRep::GetBucket( - const Slice& transformed) { - WriteLock l(&rwlock_); - auto& bucket = buckets_[transformed]; - if (!bucket) { - bucket.reset( - new decltype(buckets_)::mapped_type::element_type(Compare(compare_))); - // To memory_usage_ we add the size of the std::set and the size of the - // std::pair (decltype(buckets_)::value_type) which includes the - // Slice and the std::shared_ptr - memory_usage_ += sizeof(*bucket) + - sizeof(decltype(buckets_)::value_type); - } - return bucket; -} - -port::RWMutex* TransformRep::GetLock(const Slice& transformed) const { - return &locks_[std::hash()(transformed) % locks_.size()]; -} - -TransformRep::TransformRep(const KeyComparator& compare, Arena* arena, - const SliceTransform* transform, size_t bucket_size, - size_t num_locks) - : buckets_(bucket_size), - transform_(transform), - locks_(num_locks), - compare_(compare) { } - -void TransformRep::Insert(const char* key) { - assert(!Contains(key)); - auto transformed = transform_->Transform(UserKey(key)); - auto& bucket = GetBucket(transformed); - WriteLock bl(GetLock(transformed)); - bucket->insert(key); - memory_usage_ += sizeof(key); -} - -bool TransformRep::Contains(const char* key) const { - ReadLock l(&rwlock_); - auto transformed = transform_->Transform(UserKey(key)); - auto bucket = buckets_.find(transformed); - if (bucket == buckets_.end()) { - return false; - } - ReadLock bl(GetLock(transformed)); - return bucket->second->count(key) != 0; -} - -size_t TransformRep::ApproximateMemoryUsage() { - return memory_usage_; -} - -std::shared_ptr - TransformRep::EmptyIterator::GetInstance() { - if (!instance) { - instance.reset(new TransformRep::EmptyIterator); - } - return instance; -} - -TransformRep::Iterator::Iterator(std::shared_ptr items) - : items_(items), - cit_(items_->begin()) { } - -// Returns true iff the iterator is positioned at a valid node. -bool TransformRep::Iterator::Valid() const { - return cit_ != items_->end(); -} - -// Returns the key at the current position. -// REQUIRES: Valid() -const char* TransformRep::Iterator::key() const { - assert(Valid()); - return *cit_; -} - -// Advances to the next position. -// REQUIRES: Valid() -void TransformRep::Iterator::Next() { - assert(Valid()); - if (cit_ == items_->end()) { - return; - } - ++cit_; -} - -// Advances to the previous position. -// REQUIRES: Valid() -void TransformRep::Iterator::Prev() { - assert(Valid()); - if (cit_ == items_->begin()) { - // If you try to go back from the first element, the iterator should be - // invalidated. So we set it to past-the-end. This means that you can - // treat the container circularly. - cit_ = items_->end(); - } else { - --cit_; - } -} - -// Advance to the first entry with a key >= target -void TransformRep::Iterator::Seek(const Slice& user_key, - const char* memtable_key) { - const char* encoded_key = - (memtable_key != nullptr) ? memtable_key : EncodeKey(&tmp_, user_key); - cit_ = items_->lower_bound(encoded_key); -} - -// Position at the first entry in collection. -// Final state of iterator is Valid() iff collection is not empty. -void TransformRep::Iterator::SeekToFirst() { - cit_ = items_->begin(); -} - -void TransformRep::Iterator::SeekToLast() { - cit_ = items_->end(); - if (items_->size() != 0) { - --cit_; - } -} - -TransformRep::TransformIterator::TransformIterator( - std::shared_ptr items, port::RWMutex* rwlock) - : Iterator(items), l_(rwlock) { } - -std::shared_ptr TransformRep::GetIterator() { - auto items = std::make_shared(Compare(compare_)); - // Hold read locks on all locks - ReadLock l(&rwlock_); - std::for_each(locks_.begin(), locks_.end(), [] (port::RWMutex& lock) { - lock.ReadLock(); - }); - for (auto& bucket : buckets_) { - items->insert(bucket.second->begin(), bucket.second->end()); - } - std::for_each(locks_.begin(), locks_.end(), [] (port::RWMutex& lock) { - lock.Unlock(); - }); - return std::make_shared(std::move(items)); -} - -std::shared_ptr TransformRep::GetTransformIterator( - const Slice& transformed) { - ReadLock l(&rwlock_); - auto bucket = buckets_.find(transformed); - if (bucket == buckets_.end()) { - return EmptyIterator::GetInstance(); - } - return std::make_shared(bucket->second, - GetLock(transformed)); -} - -std::shared_ptr TransformRep::GetIterator( - const Slice& slice) { - auto transformed = transform_->Transform(slice); - return GetTransformIterator(transformed); -} - -std::shared_ptr - TransformRep::EmptyIterator::instance; - -} // anon namespace - -std::shared_ptr TransformRepFactory::CreateMemTableRep( - MemTableRep::KeyComparator& compare, Arena* arena) { - return std::make_shared(compare, arena, transform_, - bucket_count_, num_locks_); -} - -std::shared_ptr PrefixHashRepFactory::CreateMemTableRep( - MemTableRep::KeyComparator& compare, Arena* arena) { - return std::make_shared(compare, arena, transform_, - bucket_count_, num_locks_); -} - -std::shared_ptr PrefixHashRep::GetPrefixIterator( - const Slice& prefix) { - return TransformRep::GetTransformIterator(prefix); -} - -} // namespace rocksdb diff --git a/utilities/ttl/db_ttl.cc b/utilities/ttl/db_ttl.cc index abe7408a69..ee4a948b9d 100644 --- a/utilities/ttl/db_ttl.cc +++ b/utilities/ttl/db_ttl.cc @@ -291,6 +291,10 @@ Status DBWithTTL::DeleteFile(std::string name) { return db_->DeleteFile(name); } +Status DBWithTTL::GetDbIdentity(std::string& identity) { + return db_->GetDbIdentity(identity); +} + Status DBWithTTL::GetUpdatesSince( SequenceNumber seq_number, unique_ptr* iter) { diff --git a/utilities/ttl/db_ttl.h b/utilities/ttl/db_ttl.h index d09bae9661..c5270764e0 100644 --- a/utilities/ttl/db_ttl.h +++ b/utilities/ttl/db_ttl.h @@ -84,6 +84,8 @@ class DBWithTTL : public StackableDB { virtual Status DeleteFile(std::string name); + virtual Status GetDbIdentity(std::string& identity); + virtual SequenceNumber GetLatestSequenceNumber() const; virtual Status GetUpdatesSince(SequenceNumber seq_number,