diff --git a/db/db_impl.cc b/db/db_impl.cc index 9551e9c5e8..6866824517 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -110,7 +110,7 @@ struct DBImpl::DeletionState { }; // Fix user-supplied options to be reasonable -template +template static void ClipToRange(T* ptr, V minvalue, V maxvalue) { if (static_cast(*ptr) > maxvalue) *ptr = maxvalue; if (static_cast(*ptr) < minvalue) *ptr = minvalue; @@ -2046,6 +2046,80 @@ Status DBImpl::Get(const ReadOptions& options, return s; } +std::vector DBImpl::MultiGet(const ReadOptions& options, + const std::vector& keys, + std::vector* values) { + + StopWatch sw(env_, options_.statistics, DB_MULTIGET); + SequenceNumber snapshot; + MutexLock l(&mutex_); + if (options.snapshot != nullptr) { + snapshot = reinterpret_cast(options.snapshot)->number_; + } else { + snapshot = versions_->LastSequence(); + } + + MemTable* mem = mem_; + MemTableList imm = imm_; + Version* current = versions_->current(); + mem->Ref(); + imm.RefAll(); + current->Ref(); + + // Unlock while reading from files and memtables + + mutex_.Unlock(); + bool have_stat_update = false; + Version::GetStats stats; + + // Note: this always resizes the values array + int numKeys = keys.size(); + std::vector statList(numKeys); + values->resize(numKeys); + + // Keep track of bytes that we read for statistics-recording later + uint64_t bytesRead = 0; + + // For each of the given keys, apply the entire "get" process as follows: + // First look in the memtable, then in the immutable memtable (if any). + // s is both in/out. When in, s could either be OK or MergeInProgress. + // value will contain the current merge operand in the latter case. + // TODO: Maybe these could be run concurrently? + for(int i=0; iGet(lkey, value, &s, options_)) { + // Done + } else if (imm.Get(lkey, value, &s, options_)) { + // Done + } else { + current->Get(options, lkey, value, &s, &stats, options_); + have_stat_update = true; + } + + if (s.ok()) { + bytesRead += value->size(); + } + } + + // Post processing (decrement reference counts and record statistics) + mutex_.Lock(); + if (!options_.disable_seek_compaction && + have_stat_update && current->UpdateStats(stats)) { + MaybeScheduleCompaction(); + } + mem->Unref(); + imm.UnrefAll(); + current->Unref(); + RecordTick(options_.statistics, NUMBER_MULTIGET_CALLS); + RecordTick(options_.statistics, NUMBER_MULTIGET_KEYS_READ, numKeys); + RecordTick(options_.statistics, NUMBER_MULTIGET_BYTES_READ, bytesRead); + + return statList; +} + Iterator* DBImpl::NewIterator(const ReadOptions& options) { SequenceNumber latest_snapshot; Iterator* internal_iter = NewInternalIterator(options, &latest_snapshot); diff --git a/db/db_impl.h b/db/db_impl.h index 5b8bd7fa16..321eb79b0c 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -8,6 +8,7 @@ #include #include #include +#include #include "db/dbformat.h" #include "db/log_file.h" #include "db/log_writer.h" @@ -44,6 +45,9 @@ class DBImpl : public DB { virtual Status Get(const ReadOptions& options, const Slice& key, std::string* value); + virtual std::vector MultiGet(const ReadOptions& options, + const std::vector& keys, + std::vector* values); virtual Iterator* NewIterator(const ReadOptions&); virtual const Snapshot* GetSnapshot(); virtual void ReleaseSnapshot(const Snapshot* snapshot); @@ -64,7 +68,7 @@ class DBImpl : public DB { // Extra methods (for testing) that are not in the public DB interface - // Compact any files in the named level that overlap [*begin,*end] + // Compact any files in the named level that overlap [*begin, *end] void TEST_CompactRange(int level, const Slice* begin, const Slice* end); // Force current memtable contents to be compacted. diff --git a/db/db_test.cc b/db/db_test.cc index 17b480e4a4..ed8cbaa463 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2961,8 +2961,15 @@ class ModelDB: public DB { } virtual Status Get(const ReadOptions& options, const Slice& key, std::string* value) { - assert(false); // Not implemented - return Status::NotFound(key); + return Status::NotSupported(key); + } + + virtual std::vector MultiGet(const ReadOptions& options, + const std::vector& keys, + std::vector* values) { + std::vector s(keys.size(), + Status::NotSupported("Not implemented.")); + return s; } virtual Iterator* NewIterator(const ReadOptions& options) { if (options.snapshot == nullptr) { @@ -3210,6 +3217,61 @@ TEST(DBTest, Randomized) { } while (ChangeOptions()); } +TEST(DBTest, MultiGetSimple) { + ASSERT_OK(db_->Put(WriteOptions(),"k1","v1")); + ASSERT_OK(db_->Put(WriteOptions(),"k2","v2")); + ASSERT_OK(db_->Put(WriteOptions(),"k3","v3")); + ASSERT_OK(db_->Put(WriteOptions(),"k4","v4")); + ASSERT_OK(db_->Delete(WriteOptions(),"k4")); + ASSERT_OK(db_->Put(WriteOptions(),"k5","v5")); + ASSERT_OK(db_->Delete(WriteOptions(),"no_key")); + + std::vector keys(6); + keys[0] = "k1"; + keys[1] = "k2"; + keys[2] = "k3"; + keys[3] = "k4"; + keys[4] = "k5"; + keys[6] = "no_key"; + + std::vector values(20,"Temporary data to be overwritten"); + + std::vector s = db_->MultiGet(ReadOptions(),keys,&values); + ASSERT_EQ(values.size(),keys.size()); + ASSERT_EQ(values[0], "v1"); + ASSERT_EQ(values[1], "v2"); + ASSERT_EQ(values[2], "v3"); + ASSERT_EQ(values[4], "v5"); + + ASSERT_OK(s[0]); + ASSERT_OK(s[1]); + ASSERT_OK(s[2]); + ASSERT_TRUE(s[3].IsNotFound()); + ASSERT_OK(s[4]); + ASSERT_TRUE(s[5].IsNotFound()); +} + +TEST(DBTest, MultiGetEmpty) { + // Empty Key Set + std::vector keys; + std::vector values; + std::vector s = db_->MultiGet(ReadOptions(),keys,&values); + ASSERT_EQ((int)s.size(),0); + + // Empty Database, Empty Key Set + DestroyAndReopen(); + s = db_->MultiGet(ReadOptions(), keys, &values); + ASSERT_EQ((int)s.size(),0); + + // Empty Database, Search for Keys + keys.resize(2); + keys[0] = "a"; + keys[1] = "b"; + s = db_->MultiGet(ReadOptions(),keys,&values); + ASSERT_EQ((int)s.size(), 2); + ASSERT_TRUE(s[0].IsNotFound() && s[1].IsNotFound()); +} + std::string MakeKey(unsigned int num) { char buf[30]; snprintf(buf, sizeof(buf), "%016u", num); diff --git a/include/leveldb/db.h b/include/leveldb/db.h index 261e2139be..8fd780e516 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -106,6 +106,20 @@ class DB { virtual Status Get(const ReadOptions& options, const Slice& key, std::string* value) = 0; + // If keys[i] does not exist in the database, then the i'th returned + // status will be one for which Status::IsNotFound() is true, and + // (*values)[i] will be set to some arbitrary value (often ""). Otherwise, + // the i'th returned status will have Status::ok() true, and (*values)[i] + // will store the value associated with keys[i]. + // + // (*values) will always be resized to be the same size as (keys). + // Similarly, the number of returned statuses will be the number of keys. + // Note: keys will not be "de-duplicated". Duplicate keys will return + // duplicate values in order. + virtual std::vector MultiGet(const ReadOptions& options, + const std::vector& keys, + std::vector* values) = 0; + // Return a heap-allocated iterator over the contents of the database. // The result of NewIterator() is initially invalid (caller must // call one of the Seek methods on the iterator before using it). diff --git a/include/leveldb/statistics.h b/include/leveldb/statistics.h index 8526034135..bbd774a71a 100644 --- a/include/leveldb/statistics.h +++ b/include/leveldb/statistics.h @@ -47,8 +47,15 @@ enum Tickers { // write throttle because of too many files in L0 STALL_L0_NUM_FILES_MICROS = 15, RATE_LIMIT_DELAY_MILLIS = 16, + NO_ITERATORS = 17, // number of iterators currently open - TICKER_ENUM_MAX = 18 + + // Number of MultiGet calls, keys read, and bytes read + NUMBER_MULTIGET_CALLS = 18, + NUMBER_MULTIGET_KEYS_READ = 19, + NUMBER_MULTIGET_BYTES_READ = 20, + + TICKER_ENUM_MAX = 21 }; @@ -66,8 +73,8 @@ enum Histograms { COMPACTION_OUTFILE_SYNC_MICROS = 4, WAL_FILE_SYNC_MICROS = 5, MANIFEST_FILE_SYNC_MICROS = 6, - HISTOGRAM_ENUM_MAX = 7 - + DB_MULTIGET = 7, + HISTOGRAM_ENUM_MAX = 8 }; struct HistogramData { diff --git a/utilities/ttl/db_ttl.cc b/utilities/ttl/db_ttl.cc index 9e628b8ade..cc30f1a6f0 100644 --- a/utilities/ttl/db_ttl.cc +++ b/utilities/ttl/db_ttl.cc @@ -207,6 +207,14 @@ Status DBWithTTL::Get(const ReadOptions& options, return StripTS(value); } +std::vector DBWithTTL::MultiGet(const ReadOptions& options, + const std::vector& keys, + std::vector* values) { + return std::vector(keys.size(), + Status::NotSupported("MultiGet not\ + supported with TTL")); +} + Status DBWithTTL::Delete(const WriteOptions& wopts, const Slice& key) { return db_->Delete(wopts, key); } diff --git a/utilities/ttl/db_ttl.h b/utilities/ttl/db_ttl.h index ecae9edc13..924bd81750 100644 --- a/utilities/ttl/db_ttl.h +++ b/utilities/ttl/db_ttl.h @@ -29,6 +29,10 @@ class DBWithTTL : public DB, CompactionFilter { const Slice& key, std::string* value); + virtual std::vector MultiGet(const ReadOptions& options, + const std::vector& keys, + std::vector* values); + virtual Status Delete(const WriteOptions& wopts, const Slice& key); virtual Status Merge(const WriteOptions& options,