diff --git a/db/db_impl.cc b/db/db_impl.cc index 326abc8dbf..38285e0303 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3179,18 +3179,34 @@ static void CleanupIteratorState(void* arg1, void* arg2) { Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, ColumnFamilyData* cfd, - SuperVersion* super_version) { - std::vector iterator_list; - // Collect iterator for mutable mem - iterator_list.push_back(super_version->mem->NewIterator(options)); - // Collect all needed child iterators for immutable memtables - super_version->imm->AddIterators(options, &iterator_list); - // Collect iterators for files in L0 - Ln - super_version->current->AddIterators(options, storage_options_, - &iterator_list); - Iterator* internal_iter = NewMergingIterator( - &cfd->internal_comparator(), &iterator_list[0], iterator_list.size()); - + SuperVersion* super_version, + Arena* arena) { + Iterator* internal_iter; + if (arena != nullptr) { + // Need to create internal iterator from the arena. + MergeIteratorBuilder merge_iter_builder(&cfd->internal_comparator(), arena); + // Collect iterator for mutable mem + merge_iter_builder.AddIterator( + super_version->mem->NewIterator(options, false, arena)); + // Collect all needed child iterators for immutable memtables + super_version->imm->AddIterators(options, &merge_iter_builder); + // Collect iterators for files in L0 - Ln + super_version->current->AddIterators(options, storage_options_, + &merge_iter_builder); + internal_iter = merge_iter_builder.Finish(); + } else { + // Need to create internal iterator using malloc. + std::vector iterator_list; + // Collect iterator for mutable mem + iterator_list.push_back(super_version->mem->NewIterator(options)); + // Collect all needed child iterators for immutable memtables + super_version->imm->AddIterators(options, &iterator_list); + // Collect iterators for files in L0 - Ln + super_version->current->AddIterators(options, storage_options_, + &iterator_list); + internal_iter = NewMergingIterator(&cfd->internal_comparator(), + &iterator_list[0], iterator_list.size()); + } IterState* cleanup = new IterState(this, &mutex_, super_version); internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr); @@ -3541,34 +3557,77 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options, auto cfh = reinterpret_cast(column_family); auto cfd = cfh->cfd(); - Iterator* iter; if (options.tailing) { #ifdef ROCKSDB_LITE // not supported in lite version return nullptr; #else // TODO(ljin): remove tailing iterator - iter = new ForwardIterator(env_, this, options, cfd); - iter = NewDBIterator(env_, *cfd->options(), - cfd->user_comparator(), iter, kMaxSequenceNumber); - //iter = new TailingIterator(env_, this, options, cfd); + auto iter = new ForwardIterator(env_, this, options, cfd); + return NewDBIterator(env_, *cfd->options(), cfd->user_comparator(), iter, + kMaxSequenceNumber); +// return new TailingIterator(env_, this, options, cfd); #endif } else { SequenceNumber latest_snapshot = versions_->LastSequence(); SuperVersion* sv = nullptr; sv = cfd->GetReferencedSuperVersion(&mutex_); - iter = NewInternalIterator(options, cfd, sv); - auto snapshot = options.snapshot != nullptr ? reinterpret_cast(options.snapshot)->number_ : latest_snapshot; - iter = NewDBIterator(env_, *cfd->options(), - cfd->user_comparator(), iter, snapshot); - } - return iter; + // Try to generate a DB iterator tree in continuous memory area to be + // cache friendly. Here is an example of result: + // +-------------------------------+ + // | | + // | ArenaWrappedDBIter | + // | + | + // | +---> Inner Iterator ------------+ + // | | | | + // | | +-- -- -- -- -- -- -- --+ | + // | +--- | Arena | | + // | | | | + // | Allocated Memory: | | + // | | +-------------------+ | + // | | | DBIter | <---+ + // | | + | + // | | | +-> iter_ ------------+ + // | | | | | + // | | +-------------------+ | + // | | | MergingIterator | <---+ + // | | + | + // | | | +->child iter1 ------------+ + // | | | | | | + // | | +->child iter2 ----------+ | + // | | | | | | | + // | | | +->child iter3 --------+ | | + // | | | | | | + // | | +-------------------+ | | | + // | | | Iterator1 | <--------+ + // | | +-------------------+ | | + // | | | Iterator2 | <------+ + // | | +-------------------+ | + // | | | Iterator3 | <----+ + // | | +-------------------+ + // | | | + // +-------+-----------------------+ + // + // ArenaWrappedDBIter inlines an arena area where all the iterartor in the + // the iterator tree is allocated in the order of being accessed when + // querying. + // Laying out the iterators in the order of being accessed makes it more + // likely that any iterator pointer is close to the iterator it points to so + // that they are likely to be in the same cache line and/or page. + ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator( + env_, *cfd->options(), cfd->user_comparator(), snapshot); + Iterator* internal_iter = + NewInternalIterator(options, cfd, sv, db_iter->GetArena()); + db_iter->SetIterUnderDBIter(internal_iter); + + return db_iter; + } } Status DBImpl::NewIterators( diff --git a/db/db_impl.h b/db/db_impl.h index 3c1bc5fc52..6049db8d6d 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -39,6 +39,7 @@ class Version; class VersionEdit; class VersionSet; class CompactionFilterV2; +class Arena; class DBImpl : public DB { public: @@ -278,7 +279,8 @@ class DBImpl : public DB { const DBOptions options_; Iterator* NewInternalIterator(const ReadOptions&, ColumnFamilyData* cfd, - SuperVersion* super_version); + SuperVersion* super_version, + Arena* arena = nullptr); private: friend class DB; diff --git a/db/db_iter.cc b/db/db_iter.cc index 3de620dfff..4c206f02fc 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -18,6 +18,7 @@ #include "rocksdb/iterator.h" #include "rocksdb/merge_operator.h" #include "port/port.h" +#include "util/arena.h" #include "util/logging.h" #include "util/mutexlock.h" #include "util/perf_context_imp.h" @@ -37,8 +38,6 @@ static void DumpInternalIter(Iterator* iter) { } #endif -namespace { - // Memtables and sstables that make the DB representation contain // (userkey,seq,type) => uservalue entries. DBIter // combines multiple entries for the same userkey found in the DB @@ -57,9 +56,10 @@ class DBIter: public Iterator { kReverse }; - DBIter(Env* env, const Options& options, - const Comparator* cmp, Iterator* iter, SequenceNumber s) - : env_(env), + DBIter(Env* env, const Options& options, const Comparator* cmp, + Iterator* iter, SequenceNumber s, bool arena_mode) + : arena_mode_(arena_mode), + env_(env), logger_(options.info_log.get()), user_comparator_(cmp), user_merge_operator_(options.merge_operator.get()), @@ -74,7 +74,15 @@ class DBIter: public Iterator { } virtual ~DBIter() { RecordTick(statistics_, NO_ITERATORS, -1); - delete iter_; + if (!arena_mode_) { + delete iter_; + } else { + iter_->~Iterator(); + } + } + virtual void SetIter(Iterator* iter) { + assert(iter_ == nullptr); + iter_ = iter; } virtual bool Valid() const { return valid_; } virtual Slice key() const { @@ -116,11 +124,12 @@ class DBIter: public Iterator { } } + bool arena_mode_; Env* const env_; Logger* logger_; const Comparator* const user_comparator_; const MergeOperator* const user_merge_operator_; - Iterator* const iter_; + Iterator* iter_; SequenceNumber const sequence_; Status status_; @@ -461,16 +470,48 @@ void DBIter::SeekToLast() { FindPrevUserEntry(); } -} // anonymous namespace +Iterator* NewDBIterator(Env* env, const Options& options, + const Comparator* user_key_comparator, + Iterator* internal_iter, + const SequenceNumber& sequence) { + return new DBIter(env, options, user_key_comparator, internal_iter, sequence, + false); +} -Iterator* NewDBIterator( - Env* env, - const Options& options, - const Comparator *user_key_comparator, - Iterator* internal_iter, +ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~Iterator(); } + +void ArenaWrappedDBIter::SetDBIter(DBIter* iter) { db_iter_ = iter; } + +void ArenaWrappedDBIter::SetIterUnderDBIter(Iterator* iter) { + static_cast(db_iter_)->SetIter(iter); +} + +inline bool ArenaWrappedDBIter::Valid() const { return db_iter_->Valid(); } +inline void ArenaWrappedDBIter::SeekToFirst() { db_iter_->SeekToFirst(); } +inline void ArenaWrappedDBIter::SeekToLast() { db_iter_->SeekToLast(); } +inline void ArenaWrappedDBIter::Seek(const Slice& target) { + db_iter_->Seek(target); +} +inline void ArenaWrappedDBIter::Next() { db_iter_->Next(); } +inline void ArenaWrappedDBIter::Prev() { db_iter_->Prev(); } +inline Slice ArenaWrappedDBIter::key() const { return db_iter_->key(); } +inline Slice ArenaWrappedDBIter::value() const { return db_iter_->value(); } +inline Status ArenaWrappedDBIter::status() const { return db_iter_->status(); } +void ArenaWrappedDBIter::RegisterCleanup(CleanupFunction function, void* arg1, + void* arg2) { + db_iter_->RegisterCleanup(function, arg1, arg2); +} + +ArenaWrappedDBIter* NewArenaWrappedDbIterator( + Env* env, const Options& options, const Comparator* user_key_comparator, const SequenceNumber& sequence) { - return new DBIter(env, options, user_key_comparator, - internal_iter, sequence); + ArenaWrappedDBIter* iter = new ArenaWrappedDBIter(); + Arena* arena = iter->GetArena(); + auto mem = arena->AllocateAligned(sizeof(DBIter)); + DBIter* db_iter = new (mem) + DBIter(env, options, user_key_comparator, nullptr, sequence, true); + iter->SetDBIter(db_iter); + return iter; } } // namespace rocksdb diff --git a/db/db_iter.h b/db/db_iter.h index d8a3bad51d..cb9840324f 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -11,9 +11,14 @@ #include #include "rocksdb/db.h" #include "db/dbformat.h" +#include "util/arena.h" +#include "util/autovector.h" namespace rocksdb { +class Arena; +class DBIter; + // Return a new iterator that converts internal keys (yielded by // "*internal_iter") that were live at the specified "sequence" number // into appropriate user keys. @@ -24,4 +29,45 @@ extern Iterator* NewDBIterator( Iterator* internal_iter, const SequenceNumber& sequence); +// A wrapper iterator which wraps DB Iterator and the arena, with which the DB +// iterator is supposed be allocated. This class is used as an entry point of +// a iterator hierarchy whose memory can be allocated inline. In that way, +// accessing the iterator tree can be more cache friendly. It is also faster +// to allocate. +class ArenaWrappedDBIter : public Iterator { + public: + virtual ~ArenaWrappedDBIter(); + + // Get the arena to be used to allocate memory for DBIter to be wrapped, + // as well as child iterators in it. + virtual Arena* GetArena() { return &arena_; } + + // Set the DB Iterator to be wrapped + + virtual void SetDBIter(DBIter* iter); + + // Set the internal iterator wrapped inside the DB Iterator. Usually it is + // a merging iterator. + virtual void SetIterUnderDBIter(Iterator* iter); + virtual bool Valid() const override; + virtual void SeekToFirst() override; + virtual void SeekToLast() override; + virtual void Seek(const Slice& target) override; + virtual void Next() override; + virtual void Prev() override; + virtual Slice key() const override; + virtual Slice value() const override; + virtual Status status() const override; + void RegisterCleanup(CleanupFunction function, void* arg1, void* arg2); + + private: + DBIter* db_iter_; + Arena arena_; +}; + +// Generate the arena wrapped iterator class. +extern ArenaWrappedDBIter* NewArenaWrappedDbIterator( + Env* env, const Options& options, const Comparator* user_key_comparator, + const SequenceNumber& sequence); + } // namespace rocksdb diff --git a/db/memtable.cc b/db/memtable.cc index 45f58b979d..c6b915b997 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -20,6 +20,7 @@ #include "rocksdb/iterator.h" #include "rocksdb/merge_operator.h" #include "rocksdb/slice_transform.h" +#include "table/merger.h" #include "util/arena.h" #include "util/coding.h" #include "util/murmurhash.h" @@ -173,15 +174,24 @@ const char* EncodeKey(std::string* scratch, const Slice& target) { class MemTableIterator: public Iterator { public: MemTableIterator(const MemTable& mem, const ReadOptions& options, - bool enforce_total_order) + bool enforce_total_order, Arena* arena) : bloom_(nullptr), prefix_extractor_(mem.prefix_extractor_), - valid_(false) { + valid_(false), + arena_mode_(arena != nullptr) { if (prefix_extractor_ != nullptr && !enforce_total_order) { bloom_ = mem.prefix_bloom_.get(); - iter_.reset(mem.table_->GetDynamicPrefixIterator()); + iter_ = mem.table_->GetDynamicPrefixIterator(arena); } else { - iter_.reset(mem.table_->GetIterator()); + iter_ = mem.table_->GetIterator(arena); + } + } + + ~MemTableIterator() { + if (arena_mode_) { + iter_->~Iterator(); + } else { + delete iter_; } } @@ -228,8 +238,9 @@ class MemTableIterator: public Iterator { private: DynamicBloom* bloom_; const SliceTransform* const prefix_extractor_; - std::unique_ptr iter_; + MemTableRep::Iterator* iter_; bool valid_; + bool arena_mode_; // No copying allowed MemTableIterator(const MemTableIterator&); @@ -237,8 +248,14 @@ class MemTableIterator: public Iterator { }; Iterator* MemTable::NewIterator(const ReadOptions& options, - bool enforce_total_order) { - return new MemTableIterator(*this, options, enforce_total_order); + bool enforce_total_order, Arena* arena) { + if (arena == nullptr) { + return new MemTableIterator(*this, options, enforce_total_order, nullptr); + } else { + auto mem = arena->AllocateAligned(sizeof(MemTableIterator)); + return new (mem) + MemTableIterator(*this, options, enforce_total_order, arena); + } } port::RWMutex* MemTable::GetLock(const Slice& key) { diff --git a/db/memtable.h b/db/memtable.h index 7e9af35048..8bad2773a3 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -21,6 +21,7 @@ namespace rocksdb { +class Arena; class Mutex; class MemTableIterator; class MergeContext; @@ -77,8 +78,12 @@ class MemTable { // // By default, it returns an iterator for prefix seek if prefix_extractor // is configured in Options. + // arena: If not null, the arena needs to be used to allocate the Iterator. + // Calling ~Iterator of the iterator will destroy all the states but + // those allocated in arena. Iterator* NewIterator(const ReadOptions& options, - bool enforce_total_order = false); + bool enforce_total_order = false, + Arena* arena = nullptr); // Add an entry into memtable that maps key to value at the // specified sequence number and with the specified type. diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 2354219625..de1a18eee2 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -11,6 +11,7 @@ #include "db/version_set.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" +#include "table/merger.h" #include "util/coding.h" #include "util/log_buffer.h" @@ -78,6 +79,14 @@ void MemTableListVersion::AddIterators(const ReadOptions& options, } } +void MemTableListVersion::AddIterators( + const ReadOptions& options, MergeIteratorBuilder* merge_iter_builder) { + for (auto& m : memlist_) { + merge_iter_builder->AddIterator( + m->NewIterator(options, merge_iter_builder->GetArena())); + } +} + uint64_t MemTableListVersion::GetTotalNumEntries() const { uint64_t total_num = 0; for (auto& m : memlist_) { diff --git a/db/memtable_list.h b/db/memtable_list.h index d85380b552..e56710fc98 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -28,6 +28,7 @@ namespace rocksdb { class ColumnFamilyData; class InternalKeyComparator; class Mutex; +class MergeIteratorBuilder; // keeps a list of immutable memtables in a vector. the list is immutable // if refcount is bigger than one. It is used as a state for Get() and @@ -49,6 +50,9 @@ class MemTableListVersion { void AddIterators(const ReadOptions& options, std::vector* iterator_list); + void AddIterators(const ReadOptions& options, + MergeIteratorBuilder* merge_iter_builder); + uint64_t GetTotalNumEntries() const; private: diff --git a/db/simple_table_db_test.cc b/db/simple_table_db_test.cc index affa614657..a86ff0a17d 100644 --- a/db/simple_table_db_test.cc +++ b/db/simple_table_db_test.cc @@ -83,7 +83,7 @@ public: unique_ptr && file, uint64_t file_size, unique_ptr* table_reader); - Iterator* NewIterator(const ReadOptions&) override; + Iterator* NewIterator(const ReadOptions&, Arena* arena) override; Status Get(const ReadOptions&, const Slice& key, void* arg, bool (*handle_result)(void* arg, const ParsedInternalKey& k, @@ -218,8 +218,14 @@ std::shared_ptr SimpleTableReader::GetTableProperties() return rep_->table_properties; } -Iterator* SimpleTableReader::NewIterator(const ReadOptions& options) { - return new SimpleTableIterator(this); +Iterator* SimpleTableReader::NewIterator(const ReadOptions& options, + Arena* arena) { + if (arena == nullptr) { + return new SimpleTableIterator(this); + } else { + auto mem = arena->AllocateAligned(sizeof(SimpleTableIterator)); + return new (mem) SimpleTableIterator(this); + } } Status SimpleTableReader::GetOffset(const Slice& target, uint64_t* offset) { diff --git a/db/table_cache.cc b/db/table_cache.cc index 2321d035a2..f4757cbfe4 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -13,6 +13,7 @@ #include "db/version_edit.h" #include "rocksdb/statistics.h" +#include "table/iterator_wrapper.h" #include "table/table_reader.h" #include "util/coding.h" #include "util/stop_watch.h" @@ -102,7 +103,7 @@ Iterator* TableCache::NewIterator(const ReadOptions& options, const InternalKeyComparator& icomparator, const FileMetaData& file_meta, TableReader** table_reader_ptr, - bool for_compaction) { + bool for_compaction, Arena* arena) { if (table_reader_ptr != nullptr) { *table_reader_ptr = nullptr; } @@ -113,12 +114,12 @@ Iterator* TableCache::NewIterator(const ReadOptions& options, s = FindTable(toptions, icomparator, file_meta.number, file_meta.file_size, &handle, nullptr, options.read_tier == kBlockCacheTier); if (!s.ok()) { - return NewErrorIterator(s); + return NewErrorIterator(s, arena); } table_reader = GetTableReaderFromHandle(handle); } - Iterator* result = table_reader->NewIterator(options); + Iterator* result = table_reader->NewIterator(options, arena); if (handle != nullptr) { result->RegisterCleanup(&UnrefEntry, cache_, handle); } diff --git a/db/table_cache.h b/db/table_cache.h index e8cd7ea2e2..1aa61db014 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -23,6 +23,7 @@ namespace rocksdb { class Env; +class Arena; struct FileMetaData; // TODO(sdong): try to come up with a better API to pass the file information @@ -44,7 +45,7 @@ class TableCache { const InternalKeyComparator& internal_comparator, const FileMetaData& file_meta, TableReader** table_reader_ptr = nullptr, - bool for_compaction = false); + bool for_compaction = false, Arena* arena = nullptr); // If a seek to internal key "k" in specified file finds an entry, // call (*handle_result)(arg, found_key, found_value) repeatedly until diff --git a/db/version_set.cc b/db/version_set.cc index 5327cf55fd..c6a9e6ab12 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -332,6 +332,32 @@ void Version::AddIterators(const ReadOptions& read_options, } } +void Version::AddIterators(const ReadOptions& read_options, + const EnvOptions& soptions, + MergeIteratorBuilder* merge_iter_builder) { + // Merge all level zero files together since they may overlap + for (const FileMetaData* file : files_[0]) { + merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator( + read_options, soptions, cfd_->internal_comparator(), *file, nullptr, + false, merge_iter_builder->GetArena())); + } + + // For levels > 0, we can use a concatenating iterator that sequentially + // walks through the non-overlapping files in the level, opening them + // lazily. + for (int level = 1; level < num_levels_; level++) { + if (!files_[level].empty()) { + merge_iter_builder->AddIterator(NewTwoLevelIterator( + new LevelFileIteratorState( + cfd_->table_cache(), read_options, soptions, + cfd_->internal_comparator(), false /* for_compaction */, + cfd_->options()->prefix_extractor != nullptr), + new LevelFileNumIterator(cfd_->internal_comparator(), &files_[level]), + merge_iter_builder->GetArena())); + } + } +} + // Callback from TableCache::Get() namespace { enum SaverState { diff --git a/db/version_set.h b/db/version_set.h index 852c0323b1..7c8d7146e8 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -51,6 +51,7 @@ class MergeContext; class ColumnFamilyData; class ColumnFamilySet; class TableCache; +class MergeIteratorBuilder; // Return the smallest index i such that files[i]->largest >= key. // Return files.size() if there is no such file. @@ -80,6 +81,9 @@ class Version { void AddIterators(const ReadOptions&, const EnvOptions& soptions, std::vector* iters); + void AddIterators(const ReadOptions&, const EnvOptions& soptions, + MergeIteratorBuilder* merger_iter_builder); + // Lookup the value for key. If found, store it in *val and // return OK. Else return a non-OK status. Fills *stats. // Uses *operands to store merge_operator operations to apply later diff --git a/include/rocksdb/memtablerep.h b/include/rocksdb/memtablerep.h index ac376d7475..6134fd1660 100644 --- a/include/rocksdb/memtablerep.h +++ b/include/rocksdb/memtablerep.h @@ -142,16 +142,28 @@ class MemTableRep { }; // Return an iterator over the keys in this representation. - virtual Iterator* GetIterator() = 0; + // arena: If not null, the arena needs to be used to allocate the Iterator. + // When destroying the iterator, the caller will not call "delete" + // but Iterator::~Iterator() directly. The destructor needs to destroy + // all the states but those allocated in arena. + virtual Iterator* GetIterator(Arena* arena = nullptr) = 0; // Return an iterator over at least the keys with the specified user key. The // iterator may also allow access to other keys, but doesn't have to. Default: // GetIterator(). - virtual Iterator* GetIterator(const Slice& user_key) { return GetIterator(); } + virtual Iterator* GetIterator(const Slice& user_key) { + return GetIterator(nullptr); + } // Return an iterator that has a special Seek semantics. The result of // a Seek might only include keys with the same prefix as the target key. - virtual Iterator* GetDynamicPrefixIterator() { return GetIterator(); } + // arena: If not null, the arena needs to be used to allocate the Iterator. + // When destroying the iterator, the caller will not call "delete" + // but Iterator::~Iterator() directly. The destructor needs to destroy + // all the states but those allocated in arena. + virtual Iterator* GetDynamicPrefixIterator(Arena* arena = nullptr) { + return GetIterator(arena); + } // Return true if the current MemTableRep supports merge operator. // Default: true diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index 4fcdfd29eb..71fff659a4 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -980,10 +980,11 @@ bool BlockBasedTable::PrefixMayMatch(const Slice& internal_key) { return may_match; } -Iterator* BlockBasedTable::NewIterator(const ReadOptions& read_options) { - return NewTwoLevelIterator(new BlockEntryIteratorState(this, read_options, - nullptr), - NewIndexIterator(read_options)); +Iterator* BlockBasedTable::NewIterator(const ReadOptions& read_options, + Arena* arena) { + return NewTwoLevelIterator( + new BlockEntryIteratorState(this, read_options, nullptr), + NewIndexIterator(read_options), arena); } Status BlockBasedTable::Get( diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index cfb1d0af01..ba6a10c3e2 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -68,7 +68,7 @@ class BlockBasedTable : public TableReader { // Returns a new iterator over the table contents. // The result of NewIterator() is initially invalid (caller must // call one of the Seek methods on the iterator before using it). - Iterator* NewIterator(const ReadOptions&) override; + Iterator* NewIterator(const ReadOptions&, Arena* arena = nullptr) override; Status Get(const ReadOptions& readOptions, const Slice& key, void* handle_context, diff --git a/table/iterator.cc b/table/iterator.cc index a3d4f638db..4c360205a2 100644 --- a/table/iterator.cc +++ b/table/iterator.cc @@ -8,6 +8,8 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "rocksdb/iterator.h" +#include "table/iterator_wrapper.h" +#include "util/arena.h" namespace rocksdb { @@ -65,8 +67,26 @@ Iterator* NewEmptyIterator() { return new EmptyIterator(Status::OK()); } +Iterator* NewEmptyIterator(Arena* arena) { + if (arena == nullptr) { + return NewEmptyIterator(); + } else { + auto mem = arena->AllocateAligned(sizeof(EmptyIterator)); + return new (mem) EmptyIterator(Status::OK()); + } +} + Iterator* NewErrorIterator(const Status& status) { return new EmptyIterator(status); } +Iterator* NewErrorIterator(const Status& status, Arena* arena) { + if (arena == nullptr) { + return NewErrorIterator(status); + } else { + auto mem = arena->AllocateAligned(sizeof(EmptyIterator)); + return new (mem) EmptyIterator(status); + } +} + } // namespace rocksdb diff --git a/table/iterator_wrapper.h b/table/iterator_wrapper.h index 117c9fdc45..502cacb3e8 100644 --- a/table/iterator_wrapper.h +++ b/table/iterator_wrapper.h @@ -23,14 +23,7 @@ class IteratorWrapper { explicit IteratorWrapper(Iterator* iter): iter_(nullptr) { Set(iter); } - IteratorWrapper(const IteratorWrapper&) { - // Iterator wrapper exclusively owns iter_ so it cannot be copied. - // Didn't delete the function because vector requires - // this function to compile. - assert(false); - } - void operator=(const IteratorWrapper&) = delete; - ~IteratorWrapper() { delete iter_; } + ~IteratorWrapper() {} Iterator* iter() const { return iter_; } // Takes ownership of "iter" and will delete it when destroyed, or @@ -45,6 +38,14 @@ class IteratorWrapper { } } + void DeleteIter(bool is_arena_mode) { + if (!is_arena_mode) { + delete iter_; + } else { + iter_->~Iterator(); + } + } + // Iterator interface methods bool Valid() const { return valid_; } Slice key() const { assert(Valid()); return key_; } @@ -70,4 +71,11 @@ class IteratorWrapper { Slice key_; }; +class Arena; +// Return an empty iterator (yields nothing) allocated from arena. +extern Iterator* NewEmptyIterator(Arena* arena); + +// Return an empty iterator with the specified status, allocated arena. +extern Iterator* NewErrorIterator(const Status& status, Arena* arena); + } // namespace rocksdb diff --git a/table/merger.cc b/table/merger.cc index 6c51f7fa15..9aab33ed36 100644 --- a/table/merger.cc +++ b/table/merger.cc @@ -17,13 +17,13 @@ #include "rocksdb/options.h" #include "table/iter_heap.h" #include "table/iterator_wrapper.h" +#include "util/arena.h" #include "util/stop_watch.h" #include "util/perf_context_imp.h" #include "util/autovector.h" namespace rocksdb { namespace { - typedef std::priority_queue< IteratorWrapper*, std::vector, @@ -43,13 +43,16 @@ MaxIterHeap NewMaxIterHeap(const Comparator* comparator) { MinIterHeap NewMinIterHeap(const Comparator* comparator) { return MinIterHeap(MinIteratorComparator(comparator)); } +} // namespace const size_t kNumIterReserve = 4; class MergingIterator : public Iterator { public: - MergingIterator(const Comparator* comparator, Iterator** children, int n) - : comparator_(comparator), + MergingIterator(const Comparator* comparator, Iterator** children, int n, + bool is_arena_mode) + : is_arena_mode_(is_arena_mode), + comparator_(comparator), current_(nullptr), use_heap_(true), direction_(kForward), @@ -66,7 +69,20 @@ class MergingIterator : public Iterator { } } - virtual ~MergingIterator() { } + virtual void AddIterator(Iterator* iter) { + assert(direction_ == kForward); + children_.emplace_back(iter); + auto new_wrapper = children_.back(); + if (new_wrapper.Valid()) { + minHeap_.push(&new_wrapper); + } + } + + virtual ~MergingIterator() { + for (auto& child : children_) { + child.DeleteIter(is_arena_mode_); + } + } virtual bool Valid() const { return (current_ != nullptr); @@ -242,6 +258,7 @@ class MergingIterator : public Iterator { void FindLargest(); void ClearHeaps(); + bool is_arena_mode_; const Comparator* comparator_; autovector children_; IteratorWrapper* current_; @@ -288,16 +305,51 @@ void MergingIterator::ClearHeaps() { maxHeap_ = NewMaxIterHeap(comparator_); minHeap_ = NewMinIterHeap(comparator_); } -} // namespace -Iterator* NewMergingIterator(const Comparator* cmp, Iterator** list, int n) { +Iterator* NewMergingIterator(const Comparator* cmp, Iterator** list, int n, + Arena* arena) { assert(n >= 0); if (n == 0) { - return NewEmptyIterator(); + return NewEmptyIterator(arena); } else if (n == 1) { return list[0]; } else { - return new MergingIterator(cmp, list, n); + if (arena == nullptr) { + return new MergingIterator(cmp, list, n, false); + } else { + auto mem = arena->AllocateAligned(sizeof(MergingIterator)); + return new (mem) MergingIterator(cmp, list, n, true); + } + } +} + +MergeIteratorBuilder::MergeIteratorBuilder(const Comparator* comparator, + Arena* a) + : first_iter(nullptr), use_merging_iter(false), arena(a) { + + auto mem = arena->AllocateAligned(sizeof(MergingIterator)); + merge_iter = new (mem) MergingIterator(comparator, nullptr, 0, true); +} + +void MergeIteratorBuilder::AddIterator(Iterator* iter) { + if (!use_merging_iter && first_iter != nullptr) { + merge_iter->AddIterator(first_iter); + use_merging_iter = true; + } + if (use_merging_iter) { + merge_iter->AddIterator(iter); + } else { + first_iter = iter; + } +} + +Iterator* MergeIteratorBuilder::Finish() { + if (!use_merging_iter) { + return first_iter; + } else { + auto ret = merge_iter; + merge_iter = nullptr; + return ret; } } diff --git a/table/merger.h b/table/merger.h index 3a1a4feb8c..7dcf2afe78 100644 --- a/table/merger.h +++ b/table/merger.h @@ -9,11 +9,14 @@ #pragma once +#include "rocksdb/types.h" + namespace rocksdb { class Comparator; class Iterator; class Env; +class Arena; // Return an iterator that provided the union of the data in // children[0,n-1]. Takes ownership of the child iterators and @@ -24,6 +27,34 @@ class Env; // // REQUIRES: n >= 0 extern Iterator* NewMergingIterator(const Comparator* comparator, - Iterator** children, int n); + Iterator** children, int n, + Arena* arena = nullptr); + +class MergingIterator; + +// A builder class to build a merging iterator by adding iterators one by one. +class MergeIteratorBuilder { + public: + // comparator: the comparator used in merging comparator + // arena: where the merging iterator needs to be allocated from. + explicit MergeIteratorBuilder(const Comparator* comparator, Arena* arena); + ~MergeIteratorBuilder() {} + + // Add iter to the merging iterator. + void AddIterator(Iterator* iter); + + // Get arena used to build the merging iterator. It is called one a child + // iterator needs to be allocated. + Arena* GetArena() { return arena; } + + // Return the result merging iterator. + Iterator* Finish(); + + private: + MergingIterator* merge_iter; + Iterator* first_iter; + bool use_merging_iter; + Arena* arena; +}; } // namespace rocksdb diff --git a/table/plain_table_reader.cc b/table/plain_table_reader.cc index 43daaa9a93..22968ef6b7 100644 --- a/table/plain_table_reader.cc +++ b/table/plain_table_reader.cc @@ -156,8 +156,15 @@ Status PlainTableReader::Open(const Options& options, void PlainTableReader::SetupForCompaction() { } -Iterator* PlainTableReader::NewIterator(const ReadOptions& options) { - return new PlainTableIterator(this, options_.prefix_extractor != nullptr); +Iterator* PlainTableReader::NewIterator(const ReadOptions& options, + Arena* arena) { + if (arena == nullptr) { + return new PlainTableIterator(this, options_.prefix_extractor != nullptr); + } else { + auto mem = arena->AllocateAligned(sizeof(PlainTableIterator)); + return new (mem) + PlainTableIterator(this, options_.prefix_extractor != nullptr); + } } struct PlainTableReader::IndexRecord { diff --git a/table/plain_table_reader.h b/table/plain_table_reader.h index e6373dc827..62239beb32 100644 --- a/table/plain_table_reader.h +++ b/table/plain_table_reader.h @@ -55,7 +55,7 @@ class PlainTableReader: public TableReader { const int bloom_bits_per_key, double hash_table_ratio, size_t index_sparseness, size_t huge_page_tlb_size); - Iterator* NewIterator(const ReadOptions&); + Iterator* NewIterator(const ReadOptions&, Arena* arena = nullptr) override; Status Get(const ReadOptions&, const Slice& key, void* arg, bool (*result_handler)(void* arg, const ParsedInternalKey& k, diff --git a/table/table_reader.h b/table/table_reader.h index 02a2d16dc3..9238b880c7 100644 --- a/table/table_reader.h +++ b/table/table_reader.h @@ -15,6 +15,7 @@ namespace rocksdb { class Iterator; struct ParsedInternalKey; class Slice; +class Arena; struct ReadOptions; struct TableProperties; @@ -28,7 +29,11 @@ class TableReader { // Returns a new iterator over the table contents. // The result of NewIterator() is initially invalid (caller must // call one of the Seek methods on the iterator before using it). - virtual Iterator* NewIterator(const ReadOptions&) = 0; + // arena: If not null, the arena needs to be used to allocate the Iterator. + // When destroying the iterator, the caller will not call "delete" + // but Iterator::~Iterator() directly. The destructor needs to destroy + // all the states but those allocated in arena. + virtual Iterator* NewIterator(const ReadOptions&, Arena* arena = nullptr) = 0; // Given a key, return an approximate byte offset in the file where // the data for that key begins (or would begin if the key were diff --git a/table/two_level_iterator.cc b/table/two_level_iterator.cc index 990f18184e..6af48f58ce 100644 --- a/table/two_level_iterator.cc +++ b/table/two_level_iterator.cc @@ -13,6 +13,7 @@ #include "rocksdb/table.h" #include "table/block.h" #include "table/format.h" +#include "util/arena.h" namespace rocksdb { @@ -23,7 +24,10 @@ class TwoLevelIterator: public Iterator { explicit TwoLevelIterator(TwoLevelIteratorState* state, Iterator* first_level_iter); - virtual ~TwoLevelIterator() {} + virtual ~TwoLevelIterator() { + first_level_iter_.DeleteIter(false); + second_level_iter_.DeleteIter(false); + } virtual void Seek(const Slice& target); virtual void SeekToFirst(); @@ -183,8 +187,13 @@ void TwoLevelIterator::InitDataBlock() { } // namespace Iterator* NewTwoLevelIterator(TwoLevelIteratorState* state, - Iterator* first_level_iter) { - return new TwoLevelIterator(state, first_level_iter); + Iterator* first_level_iter, Arena* arena) { + if (arena == nullptr) { + return new TwoLevelIterator(state, first_level_iter); + } else { + auto mem = arena->AllocateAligned(sizeof(TwoLevelIterator)); + return new (mem) TwoLevelIterator(state, first_level_iter); + } } } // namespace rocksdb diff --git a/table/two_level_iterator.h b/table/two_level_iterator.h index b8083385b0..d955dd7631 100644 --- a/table/two_level_iterator.h +++ b/table/two_level_iterator.h @@ -16,6 +16,7 @@ namespace rocksdb { struct ReadOptions; class InternalKeyComparator; +class Arena; struct TwoLevelIteratorState { explicit TwoLevelIteratorState(bool check_prefix_may_match) @@ -39,7 +40,11 @@ struct TwoLevelIteratorState { // // Uses a supplied function to convert an index_iter value into // an iterator over the contents of the corresponding block. +// arena: If not null, the arena is used to allocate the Iterator. +// When destroying the iterator, the destructor will destroy +// all the states but those allocated in arena. extern Iterator* NewTwoLevelIterator(TwoLevelIteratorState* state, - Iterator* first_level_iter); + Iterator* first_level_iter, + Arena* arena = nullptr); } // namespace rocksdb diff --git a/util/arena.h b/util/arena.h index 53acd2270b..0855c205c4 100644 --- a/util/arena.h +++ b/util/arena.h @@ -23,6 +23,8 @@ namespace rocksdb { class Logger; +const size_t kInlineSize = 2048; + class Arena { public: // No copying allowed diff --git a/util/hash_cuckoo_rep.cc b/util/hash_cuckoo_rep.cc index a8864692f2..e2d2c38e66 100644 --- a/util/hash_cuckoo_rep.cc +++ b/util/hash_cuckoo_rep.cc @@ -251,7 +251,7 @@ class HashCuckooRep : public MemTableRep { // are sorted according to the user specified KeyComparator. Note that // any insert after this function call may affect the sorted nature of // the returned iterator. - virtual MemTableRep::Iterator* GetIterator() override { + virtual MemTableRep::Iterator* GetIterator(Arena* arena) override { std::vector compact_buckets; for (unsigned int bid = 0; bid < bucket_count_; ++bid) { const char* bucket = cuckoo_array_[bid].load(std::memory_order_relaxed); @@ -266,10 +266,18 @@ class HashCuckooRep : public MemTableRep { compact_buckets.push_back(iter->key()); } } - return new Iterator( - std::shared_ptr>( - new std::vector(std::move(compact_buckets))), - compare_); + if (arena == nullptr) { + return new Iterator( + std::shared_ptr>( + new std::vector(std::move(compact_buckets))), + compare_); + } else { + auto mem = arena->AllocateAligned(sizeof(Iterator)); + return new (mem) Iterator( + std::shared_ptr>( + new std::vector(std::move(compact_buckets))), + compare_); + } } }; diff --git a/util/hash_linklist_rep.cc b/util/hash_linklist_rep.cc index 80edad505c..60f245b5ff 100644 --- a/util/hash_linklist_rep.cc +++ b/util/hash_linklist_rep.cc @@ -70,11 +70,12 @@ class HashLinkListRep : public MemTableRep { virtual ~HashLinkListRep(); - virtual MemTableRep::Iterator* GetIterator() override; + virtual MemTableRep::Iterator* GetIterator(Arena* arena = nullptr) override; virtual MemTableRep::Iterator* GetIterator(const Slice& slice) override; - virtual MemTableRep::Iterator* GetDynamicPrefixIterator() override; + virtual MemTableRep::Iterator* GetDynamicPrefixIterator( + Arena* arena = nullptr) override; private: friend class DynamicIterator; @@ -411,7 +412,7 @@ void HashLinkListRep::Get(const LookupKey& k, void* callback_args, } } -MemTableRep::Iterator* HashLinkListRep::GetIterator() { +MemTableRep::Iterator* HashLinkListRep::GetIterator(Arena* alloc_arena) { // allocate a new arena of similar size to the one currently in use Arena* new_arena = new Arena(arena_->BlockSize()); auto list = new FullList(compare_, new_arena); @@ -424,7 +425,12 @@ MemTableRep::Iterator* HashLinkListRep::GetIterator() { } } } - return new FullListIterator(list, new_arena); + if (alloc_arena == nullptr) { + return new FullListIterator(list, new_arena); + } else { + auto mem = alloc_arena->AllocateAligned(sizeof(FullListIterator)); + return new (mem) FullListIterator(list, new_arena); + } } MemTableRep::Iterator* HashLinkListRep::GetIterator(const Slice& slice) { @@ -435,8 +441,14 @@ MemTableRep::Iterator* HashLinkListRep::GetIterator(const Slice& slice) { return new Iterator(this, bucket); } -MemTableRep::Iterator* HashLinkListRep::GetDynamicPrefixIterator() { - return new DynamicIterator(*this); +MemTableRep::Iterator* HashLinkListRep::GetDynamicPrefixIterator( + Arena* alloc_arena) { + if (alloc_arena == nullptr) { + return new DynamicIterator(*this); + } else { + auto mem = alloc_arena->AllocateAligned(sizeof(DynamicIterator)); + return new (mem) DynamicIterator(*this); + } } bool HashLinkListRep::BucketContains(Node* head, const Slice& user_key) const { diff --git a/util/hash_skiplist_rep.cc b/util/hash_skiplist_rep.cc index 1f03874d19..baee12ad53 100644 --- a/util/hash_skiplist_rep.cc +++ b/util/hash_skiplist_rep.cc @@ -38,11 +38,12 @@ class HashSkipListRep : public MemTableRep { virtual ~HashSkipListRep(); - virtual MemTableRep::Iterator* GetIterator() override; + virtual MemTableRep::Iterator* GetIterator(Arena* arena = nullptr) override; virtual MemTableRep::Iterator* GetIterator(const Slice& slice) override; - virtual MemTableRep::Iterator* GetDynamicPrefixIterator() override; + virtual MemTableRep::Iterator* GetDynamicPrefixIterator( + Arena* arena = nullptr) override; private: friend class DynamicIterator; @@ -288,7 +289,7 @@ void HashSkipListRep::Get(const LookupKey& k, void* callback_args, } } -MemTableRep::Iterator* HashSkipListRep::GetIterator() { +MemTableRep::Iterator* HashSkipListRep::GetIterator(Arena* arena) { // allocate a new arena of similar size to the one currently in use Arena* new_arena = new Arena(arena_->BlockSize()); auto list = new Bucket(compare_, new_arena); @@ -301,7 +302,12 @@ MemTableRep::Iterator* HashSkipListRep::GetIterator() { } } } - return new Iterator(list, true, new_arena); + if (arena == nullptr) { + return new Iterator(list, true, new_arena); + } else { + auto mem = arena->AllocateAligned(sizeof(Iterator)); + return new (mem) Iterator(list, true, new_arena); + } } MemTableRep::Iterator* HashSkipListRep::GetIterator(const Slice& slice) { @@ -312,8 +318,13 @@ MemTableRep::Iterator* HashSkipListRep::GetIterator(const Slice& slice) { return new Iterator(bucket, false); } -MemTableRep::Iterator* HashSkipListRep::GetDynamicPrefixIterator() { - return new DynamicIterator(*this); +MemTableRep::Iterator* HashSkipListRep::GetDynamicPrefixIterator(Arena* arena) { + if (arena == nullptr) { + return new DynamicIterator(*this); + } else { + auto mem = arena->AllocateAligned(sizeof(DynamicIterator)); + return new (mem) DynamicIterator(*this); + } } } // anon namespace diff --git a/util/skiplistrep.cc b/util/skiplistrep.cc index f36edf28d9..895343001b 100644 --- a/util/skiplistrep.cc +++ b/util/skiplistrep.cc @@ -6,6 +6,7 @@ #include "rocksdb/memtablerep.h" #include "db/memtable.h" #include "db/skiplist.h" +#include "util/arena.h" namespace rocksdb { namespace { @@ -108,8 +109,13 @@ public: // Unhide default implementations of GetIterator using MemTableRep::GetIterator; - virtual MemTableRep::Iterator* GetIterator() override { - return new SkipListRep::Iterator(&skip_list_); + virtual MemTableRep::Iterator* GetIterator(Arena* arena = nullptr) override { + if (arena == nullptr) { + return new SkipListRep::Iterator(&skip_list_); + } else { + auto mem = arena->AllocateAligned(sizeof(SkipListRep::Iterator)); + return new (mem) SkipListRep::Iterator(&skip_list_); + } } }; } diff --git a/util/vectorrep.cc b/util/vectorrep.cc index 00e5c7450b..cf8bad5c4d 100644 --- a/util/vectorrep.cc +++ b/util/vectorrep.cc @@ -95,7 +95,7 @@ class VectorRep : public MemTableRep { using MemTableRep::GetIterator; // Return an iterator over the keys in this representation. - virtual MemTableRep::Iterator* GetIterator() override; + virtual MemTableRep::Iterator* GetIterator(Arena* arena) override; private: friend class Iterator; @@ -259,16 +259,28 @@ void VectorRep::Get(const LookupKey& k, void* callback_args, } } -MemTableRep::Iterator* VectorRep::GetIterator() { +MemTableRep::Iterator* VectorRep::GetIterator(Arena* arena) { + char* mem = nullptr; + if (arena != nullptr) { + mem = arena->AllocateAligned(sizeof(Iterator)); + } ReadLock l(&rwlock_); // Do not sort here. The sorting would be done the first time // a Seek is performed on the iterator. if (immutable_) { - return new Iterator(this, bucket_, compare_); + if (arena == nullptr) { + return new Iterator(this, bucket_, compare_); + } else { + return new (mem) Iterator(this, bucket_, compare_); + } } else { std::shared_ptr tmp; tmp.reset(new Bucket(*bucket_)); // make a copy - return new Iterator(nullptr, tmp, compare_); + if (arena == nullptr) { + return new Iterator(nullptr, tmp, compare_); + } else { + return new (mem) Iterator(nullptr, tmp, compare_); + } } } } // anon namespace