diff --git a/HISTORY.md b/HISTORY.md index a808682d97..158a2d03e4 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -14,6 +14,11 @@ * Added is_manual_compaction to CompactionFilter::Context * Added "virtual void WaitForJoin() = 0" in class Env +### New Features +* If we find one truncated record at the end of the MANIFEST or WAL files, + we will ignore it. We assume that writers of these records were interrupted + and that we can safely ignore it. + ## 2.7.0 (01/28/2014) ### Public API changes diff --git a/Makefile b/Makefile index ecb52ff194..ff39259acc 100644 --- a/Makefile +++ b/Makefile @@ -12,6 +12,10 @@ OPT += -O2 -fno-omit-frame-pointer -momit-leaf-frame-pointer else OPT += -fno-omit-frame-pointer -momit-leaf-frame-pointer endif + +ifeq ($(MAKECMDGOALS),shared_lib) +PLATFORM_SHARED_LDFLAGS=-fPIC +endif #----------------------------------------------- # detect what platform we're building on @@ -136,8 +140,8 @@ $(SHARED2): $(SHARED3) ln -fs $(SHARED3) $(SHARED2) endif -$(SHARED3): $(LIBOBJECTS) - $(CXX) $(PLATFORM_SHARED_LDFLAGS)$(SHARED2) $(CXXFLAGS) $(PLATFORM_SHARED_CFLAGS) $(LDFLAGS) $(SOURCES)-o $@ +$(SHARED3): + $(CXX) $(PLATFORM_SHARED_LDFLAGS)$(SHARED2) $(CXXFLAGS) $(PLATFORM_SHARED_CFLAGS) $(LDFLAGS) $(SOURCES) -o $@ endif # PLATFORM_SHARED_EXT diff --git a/db/column_family.cc b/db/column_family.cc index 1cf46c5a8b..b626634bcb 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -151,6 +151,18 @@ void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm, refs.store(1, std::memory_order_relaxed); } +namespace { +void SuperVersionUnrefHandle(void* ptr) { + SuperVersion* sv = static_cast(ptr); + if (sv->Unref()) { + sv->db_mutex->Lock(); + sv->Cleanup(); + sv->db_mutex->Unlock(); + delete sv; + } +} +} // anonymous namespace + ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id, const std::string& name, Version* dummy_versions, Cache* table_cache, @@ -173,6 +185,7 @@ ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id, imm_(options.min_write_buffer_number_to_merge), super_version_(nullptr), super_version_number_(0), + local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)), next_(nullptr), prev_(nullptr), log_number_(0), @@ -209,6 +222,20 @@ ColumnFamilyData::~ColumnFamilyData() { prev->next_ = next; next->prev_ = prev; + // Release SuperVersion reference kept in ThreadLocalPtr. + // This must be done outside of mutex_ since unref handler can lock mutex. + // It also needs to be done after FlushMemTable, which can trigger local_sv_ + // access. + auto sv = static_cast(local_sv_->Get()); + if (sv != nullptr) { + auto mutex = sv->db_mutex; + mutex->Unlock(); + delete local_sv_; + mutex->Lock(); + } else { + delete local_sv_; + } + if (super_version_ != nullptr) { bool is_last_reference __attribute__((unused)); is_last_reference = super_version_->Unref(); @@ -276,11 +303,13 @@ Compaction* ColumnFamilyData::CompactRange(int input_level, int output_level, } SuperVersion* ColumnFamilyData::InstallSuperVersion( - SuperVersion* new_superversion) { + SuperVersion* new_superversion, port::Mutex* db_mutex) { new_superversion->Init(mem_, imm_.current(), current_); SuperVersion* old_superversion = super_version_; super_version_ = new_superversion; ++super_version_number_; + super_version_->version_number = super_version_number_; + super_version_->db_mutex = db_mutex; if (old_superversion != nullptr && old_superversion->Unref()) { old_superversion->Cleanup(); return old_superversion; // will let caller delete outside of mutex @@ -288,6 +317,19 @@ SuperVersion* ColumnFamilyData::InstallSuperVersion( return nullptr; } +void ColumnFamilyData::ResetThreadLocalSuperVersions() { + autovector sv_ptrs; + local_sv_->Scrape(&sv_ptrs); + for (auto ptr : sv_ptrs) { + assert(ptr); + auto sv = static_cast(ptr); + if (sv->Unref()) { + sv->Cleanup(); + delete sv; + } + } +} + ColumnFamilySet::ColumnFamilySet(const std::string& dbname, const DBOptions* db_options, const EnvOptions& storage_options, diff --git a/db/column_family.h b/db/column_family.h index 378afaf750..2da8061076 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -19,6 +19,7 @@ #include "db/memtable_list.h" #include "db/write_batch_internal.h" #include "db/table_cache.h" +#include "util/thread_local.h" namespace rocksdb { @@ -72,6 +73,9 @@ struct SuperVersion { // all memtables that we need to free through this vector. We then // delete all those memtables outside of mutex, during destruction autovector to_delete; + // Version number of the current SuperVersion + uint64_t version_number; + port::Mutex* db_mutex; // should be called outside the mutex SuperVersion() = default; @@ -159,6 +163,12 @@ class ColumnFamilyData { } SuperVersion* GetSuperVersion() const { return super_version_; } + SuperVersion* GetAndResetThreadLocalSuperVersion() const { + return static_cast(local_sv_->Swap(nullptr)); + } + void SetThreadLocalSuperVersion(SuperVersion* super_version) { + local_sv_->Reset(static_cast(super_version)); + } uint64_t GetSuperVersionNumber() const { return super_version_number_.load(); } @@ -166,7 +176,10 @@ class ColumnFamilyData { // if its reference count is zero and needs deletion or nullptr if not // As argument takes a pointer to allocated SuperVersion to enable // the clients to allocate SuperVersion outside of mutex. - SuperVersion* InstallSuperVersion(SuperVersion* new_superversion); + SuperVersion* InstallSuperVersion(SuperVersion* new_superversion, + port::Mutex* db_mutex); + + void ResetThreadLocalSuperVersions(); // A Flag indicating whether write needs to slowdown because of there are // too many number of level0 files. @@ -212,6 +225,10 @@ class ColumnFamilyData { // changes. std::atomic super_version_number_; + // Thread's local copy of SuperVersion pointer + // This needs to be destructed before mutex_ + ThreadLocalPtr* local_sv_; + // pointers for a circular linked list. we use it to support iterations // that can be concurrent with writes ColumnFamilyData* next_; diff --git a/db/db_impl.cc b/db/db_impl.cc index bbf559d998..665c8cc08b 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -43,6 +43,7 @@ #include "db/write_batch_internal.h" #include "port/port.h" #include "rocksdb/cache.h" +#include "port/likely.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/db.h" #include "rocksdb/env.h" @@ -238,8 +239,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) delayed_writes_(0), storage_options_(options), bg_work_gate_closed_(false), - refitting_level_(false) { - + refitting_level_(false), + opened_successfully_(false) { env_->GetAbsolutePath(dbname, &db_absolute_path_); // Reserve ten files or so for other uses and give the rest to TableCache. @@ -298,6 +299,26 @@ DBImpl::~DBImpl() { bg_logstats_scheduled_) { bg_cv_.Wait(); } + + if (options_.allow_thread_local) { + // Clean up obsolete files due to SuperVersion release. + // (1) Need to delete to obsolete files before closing because RepairDB() + // scans all existing files in the file system and builds manifest file. + // Keeping obsolete files confuses the repair process. + // (2) Need to check if we Open()/Recover() the DB successfully before + // deleting because if VersionSet recover fails (may be due to corrupted + // manifest file), it is not able to identify live files correctly. As a + // result, all "live" files can get deleted by accident. However, corrupted + // manifest is recoverable by RepairDB(). + if (opened_successfully_) { + DeletionState deletion_state; + FindObsoleteFiles(deletion_state, true); + // manifest number starting from 2 + deletion_state.manifest_file_number = 1; + PurgeObsoleteFiles(deletion_state); + } + } + mutex_.Unlock(); if (default_cf_handle_ != nullptr) { // we need to delete handle outside of lock because it does its own locking @@ -358,7 +379,8 @@ Status DBImpl::NewDB() { const std::string manifest = DescriptorFileName(dbname_, 1); unique_ptr file; - Status s = env_->NewWritableFile(manifest, &file, storage_options_); + Status s = env_->NewWritableFile(manifest, &file, + storage_options_.AdaptForLogWrite()); if (!s.ok()) { return s; } @@ -1229,6 +1251,10 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd, if (s.ok()) { InstallSuperVersion(cfd, deletion_state); + // Reset SuperVersions cached in thread local storage + if (options_.allow_thread_local) { + cfd->ResetThreadLocalSuperVersions(); + } if (madeProgress) { *madeProgress = 1; } @@ -1361,7 +1387,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { edit.DebugString().data()); status = versions_->LogAndApply(cfd, &edit, &mutex_, db_directory_.get()); - superversion_to_free = cfd->InstallSuperVersion(new_superversion); + superversion_to_free = cfd->InstallSuperVersion(new_superversion, &mutex_); new_superversion = nullptr; Log(options_.info_log, "LogAndApply: %s\n", status.ToString().data()); @@ -1406,8 +1432,9 @@ SequenceNumber DBImpl::GetLatestSequenceNumber() const { return versions_->LastSequence(); } -Status DBImpl::GetUpdatesSince(SequenceNumber seq, - unique_ptr* iter) { +Status DBImpl::GetUpdatesSince( + SequenceNumber seq, unique_ptr* iter, + const TransactionLogIterator::ReadOptions& read_options) { RecordTick(options_.statistics.get(), GET_UPDATES_SINCE_CALLS); if (seq > versions_->LastSequence()) { @@ -1427,13 +1454,9 @@ Status DBImpl::GetUpdatesSince(SequenceNumber seq, if (!s.ok()) { return s; } - iter->reset( - new TransactionLogIteratorImpl(options_.wal_dir, - &options_, - storage_options_, - seq, - std::move(wal_files), - this)); + iter->reset(new TransactionLogIteratorImpl(options_.wal_dir, &options_, + read_options, storage_options_, + seq, std::move(wal_files), this)); return (*iter)->status(); } @@ -2004,6 +2027,9 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, status = versions_->LogAndApply(c->column_family_data(), c->edit(), &mutex_, db_directory_.get()); InstallSuperVersion(c->column_family_data(), deletion_state); + if (options_.allow_thread_local) { + c->column_family_data()->ResetThreadLocalSuperVersions(); + } Version::LevelSummaryStorage tmp; Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", @@ -2815,7 +2841,7 @@ Status DBImpl::Get(const ReadOptions& options, // DeletionState gets created and destructed outside of the lock -- we // use this convinently to: // * malloc one SuperVersion() outside of the lock -- new_superversion -// * delete one SuperVersion() outside of the lock -- superversion_to_free +// * delete SuperVersion()s outside of the lock -- superversions_to_free // // However, if InstallSuperVersion() gets called twice with the same, // deletion_state, we can't reuse the SuperVersion() that got malloced because @@ -2829,14 +2855,10 @@ void DBImpl::InstallSuperVersion(ColumnFamilyData* cfd, SuperVersion* new_superversion = (deletion_state.new_superversion != nullptr) ? deletion_state.new_superversion : new SuperVersion(); - SuperVersion* old_superversion = cfd->InstallSuperVersion(new_superversion); + SuperVersion* old_superversion = + cfd->InstallSuperVersion(new_superversion, &mutex_); deletion_state.new_superversion = nullptr; - if (deletion_state.superversion_to_free != nullptr) { - // somebody already put it there - delete old_superversion; - } else { - deletion_state.superversion_to_free = old_superversion; - } + deletion_state.superversions_to_free.push_back(old_superversion); } Status DBImpl::GetImpl(const ReadOptions& options, @@ -2849,10 +2871,6 @@ Status DBImpl::GetImpl(const ReadOptions& options, auto cfh = reinterpret_cast(column_family); auto cfd = cfh->cfd(); - mutex_.Lock(); - SuperVersion* get_version = cfd->GetSuperVersion()->Ref(); - mutex_.Unlock(); - SequenceNumber snapshot; if (options.snapshot != nullptr) { snapshot = reinterpret_cast(options.snapshot)->number_; @@ -2860,6 +2878,41 @@ Status DBImpl::GetImpl(const ReadOptions& options, snapshot = versions_->LastSequence(); } + // Acquire SuperVersion + SuperVersion* sv = nullptr; + if (LIKELY(options_.allow_thread_local)) { + // The SuperVersion is cached in thread local storage to avoid acquiring + // mutex when SuperVersion does not change since the last use. When a new + // SuperVersion is installed, the compaction or flush thread cleans up + // cached SuperVersion in all existing thread local storage. To avoid + // acquiring mutex for this operation, we use atomic Swap() on the thread + // local pointer to guarantee exclusive access. If the thread local pointer + // is being used while a new SuperVersion is installed, the cached + // SuperVersion can become stale. It will eventually get refreshed either + // on the next GetImpl() call or next SuperVersion installation. + sv = cfd->GetAndResetThreadLocalSuperVersion(); + if (!sv || sv->version_number != cfd->GetSuperVersionNumber()) { + RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_UPDATES); + SuperVersion* sv_to_delete = nullptr; + + if (sv && sv->Unref()) { + mutex_.Lock(); + sv->Cleanup(); + sv_to_delete = sv; + } else { + mutex_.Lock(); + } + sv = cfd->GetSuperVersion()->Ref(); + mutex_.Unlock(); + + delete sv_to_delete; + } + } else { + mutex_.Lock(); + sv = cfd->GetSuperVersion()->Ref(); + mutex_.Unlock(); + } + bool have_stat_update = false; Version::GetStats stats; @@ -2872,12 +2925,11 @@ Status DBImpl::GetImpl(const ReadOptions& options, // merge_operands will contain the sequence of merges in the latter case. LookupKey lkey(key, snapshot); BumpPerfTime(&perf_context.get_snapshot_time, &snapshot_timer); - if (get_version->mem->Get(lkey, value, &s, merge_context, - *cfd->full_options())) { + if (sv->mem->Get(lkey, value, &s, merge_context, *cfd->full_options())) { // Done RecordTick(options_.statistics.get(), MEMTABLE_HIT); - } else if (get_version->imm->Get(lkey, value, &s, merge_context, - *cfd->full_options())) { + } else if (sv->imm->Get(lkey, value, &s, merge_context, + *cfd->full_options())) { // Done RecordTick(options_.statistics.get(), MEMTABLE_HIT); } else { @@ -2885,8 +2937,8 @@ Status DBImpl::GetImpl(const ReadOptions& options, StopWatchNano from_files_timer(env_, false); StartPerfTimer(&from_files_timer); - get_version->current->Get(options, lkey, value, &s, &merge_context, &stats, - *cfd->full_options(), value_found); + sv->current->Get(options, lkey, value, &s, &merge_context, &stats, + *cfd->full_options(), value_found); have_stat_update = true; BumpPerfTime(&perf_context.get_from_output_files_time, &from_files_timer); RecordTick(options_.statistics.get(), MEMTABLE_MISS); @@ -2895,31 +2947,32 @@ Status DBImpl::GetImpl(const ReadOptions& options, StopWatchNano post_process_timer(env_, false); StartPerfTimer(&post_process_timer); - bool delete_get_version = false; if (!cfd->options()->disable_seek_compaction && have_stat_update) { mutex_.Lock(); - if (get_version->current->UpdateStats(stats)) { + if (sv->current->UpdateStats(stats)) { MaybeScheduleFlushOrCompaction(); } - if (get_version->Unref()) { - get_version->Cleanup(); - delete_get_version = true; - } mutex_.Unlock(); - } else { - if (get_version->Unref()) { - mutex_.Lock(); - get_version->Cleanup(); - mutex_.Unlock(); - delete_get_version = true; - } } - if (delete_get_version) { - delete get_version; + + // Release SuperVersion + if (LIKELY(options_.allow_thread_local)) { + // Put the SuperVersion back + cfd->SetThreadLocalSuperVersion(sv); + } else { + bool delete_sv = false; + if (sv->Unref()) { + mutex_.Lock(); + sv->Cleanup(); + mutex_.Unlock(); + delete_sv = true; + } + if (delete_sv) { + delete sv; + } } // Note, tickers are atomic now - no lock protection needed any more. - RecordTick(options_.statistics.get(), NUMBER_KEYS_READ); RecordTick(options_.statistics.get(), BYTES_READ, value->size()); BumpPerfTime(&perf_context.get_post_process_time, &post_process_timer); @@ -3074,6 +3127,7 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options, auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name); assert(cfd != nullptr); + delete cfd->InstallSuperVersion(new SuperVersion(), &mutex_); *handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_); Log(options_.info_log, "Created column family \"%s\" (ID %u)", column_family_name.c_str(), (unsigned)cfd->GetID()); @@ -3575,11 +3629,9 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) { SuperVersion* new_superversion = nullptr; mutex_.Unlock(); { - EnvOptions soptions(storage_options_); - soptions.use_mmap_writes = false; DelayLoggingAndReset(); s = env_->NewWritableFile(LogFileName(options_.wal_dir, new_log_number), - &lfile, soptions); + &lfile, storage_options_.AdaptForLogWrite()); if (s.ok()) { // Our final size should be less than write_buffer_size // (compression, etc) but err on the side of caution. @@ -3621,7 +3673,7 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) { cfd->GetID(), (unsigned long)logfile_number_); force = false; // Do not force another compaction if have room MaybeScheduleFlushOrCompaction(); - delete cfd->InstallSuperVersion(new_superversion); + delete cfd->InstallSuperVersion(new_superversion, &mutex_); } } return s; @@ -3888,7 +3940,6 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, std::vector* handles, DB** dbptr) { *dbptr = nullptr; handles->clear(); - EnvOptions soptions(db_options); size_t max_write_buffer_size = 0; for (auto cf : column_families) { @@ -3918,12 +3969,10 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, if (s.ok()) { uint64_t new_log_number = impl->versions_->NewFileNumber(); unique_ptr lfile; - soptions.use_mmap_writes = false; + EnvOptions soptions(db_options); s = impl->options_.env->NewWritableFile( - LogFileName(impl->options_.wal_dir, new_log_number), - &lfile, - soptions - ); + LogFileName(impl->options_.wal_dir, new_log_number), &lfile, + soptions.AdaptForLogWrite()); if (s.ok()) { lfile->SetPreallocationBlockSize(1.1 * max_write_buffer_size); VersionEdit edit; @@ -3953,7 +4002,7 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, } if (s.ok()) { for (auto cfd : *impl->versions_->GetColumnFamilySet()) { - delete cfd->InstallSuperVersion(new SuperVersion()); + delete cfd->InstallSuperVersion(new SuperVersion(), &impl->mutex_); impl->alive_log_files_.push_back(impl->logfile_number_); } impl->DeleteObsoleteFiles(); @@ -3985,6 +4034,7 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, impl->mutex_.Unlock(); if (s.ok()) { + impl->opened_successfully_ = true; *dbptr = impl; } else { for (auto h : *handles) { diff --git a/db/db_impl.h b/db/db_impl.h index f82eb69517..128de8ed1a 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -28,6 +28,7 @@ #include "rocksdb/transaction_log.h" #include "util/autovector.h" #include "util/stats_logger.h" +#include "util/thread_local.h" #include "db/internal_stats.h" namespace rocksdb { @@ -121,8 +122,10 @@ class DBImpl : public DB { bool flush_memtable = true); virtual Status GetSortedWalFiles(VectorLogPtr& files); virtual SequenceNumber GetLatestSequenceNumber() const; - virtual Status GetUpdatesSince(SequenceNumber seq_number, - unique_ptr* iter); + virtual Status GetUpdatesSince( + SequenceNumber seq_number, unique_ptr* iter, + const TransactionLogIterator::ReadOptions& + read_options = TransactionLogIterator::ReadOptions()); virtual Status DeleteFile(std::string name); virtual void GetLiveFilesMetaData(std::vector* metadata); @@ -204,7 +207,7 @@ class DBImpl : public DB { // a list of memtables to be free autovector memtables_to_free; - SuperVersion* superversion_to_free; // if nullptr nothing to free + autovector superversions_to_free; SuperVersion* new_superversion; // if nullptr no new superversion @@ -216,7 +219,6 @@ class DBImpl : public DB { manifest_file_number = 0; log_number = 0; prev_log_number = 0; - superversion_to_free = nullptr; new_superversion = create_superversion ? new SuperVersion() : nullptr; } @@ -225,8 +227,10 @@ class DBImpl : public DB { for (auto m : memtables_to_free) { delete m; } - // free superversion. if nullptr, this will be noop - delete superversion_to_free; + // free superversions + for (auto s : superversions_to_free) { + delete s; + } // if new_superversion was not used, it will be non-nullptr and needs // to be freed here delete new_superversion; @@ -476,6 +480,9 @@ class DBImpl : public DB { // Guard against multiple concurrent refitting bool refitting_level_; + // Indicate DB was opened successfully + bool opened_successfully_; + // No copying allowed DBImpl(const DBImpl&); void operator=(const DBImpl&); diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index b179ff5f8d..15725b335b 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -103,7 +103,7 @@ Status DB::OpenForReadOnly(const Options& options, const std::string& dbname, error_if_log_file_exist); if (s.ok()) { for (auto cfd : *impl->versions_->GetColumnFamilySet()) { - delete cfd->InstallSuperVersion(new SuperVersion()); + delete cfd->InstallSuperVersion(new SuperVersion(), &impl->mutex_); } } impl->mutex_.Unlock(); diff --git a/db/db_test.cc b/db/db_test.cc index d3a88d7cf4..ca5ea4c297 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -5288,8 +5288,10 @@ class ModelDB: public DB { virtual SequenceNumber GetLatestSequenceNumber() const { return 0; } - virtual Status GetUpdatesSince(rocksdb::SequenceNumber, - unique_ptr*) { + virtual Status GetUpdatesSince( + rocksdb::SequenceNumber, unique_ptr*, + const TransactionLogIterator::ReadOptions& + read_options = TransactionLogIterator::ReadOptions()) { return Status::NotSupported("Not supported in Model DB"); } diff --git a/db/log_reader.cc b/db/log_reader.cc index 1dc567413d..be1fb8ceb6 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -140,7 +140,9 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch) { case kEof: if (in_fragmented_record) { - ReportCorruption(scratch->size(), "partial record without end(3)"); + // This can be caused by the writer dying immediately after + // writing a physical record but before completing the next; don't + // treat it as a corruption, just ignore the entire logical record. scratch->clear(); } return false; @@ -264,13 +266,12 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) { eof_offset_ = buffer_.size(); } continue; - } else if (buffer_.size() == 0) { - // End of file - return kEof; } else { - size_t drop_size = buffer_.size(); + // Note that if buffer_ is non-empty, we have a truncated header at the + // end of the file, which can be caused by the writer crashing in the + // middle of writing the header. Instead of considering this an error, + // just report EOF. buffer_.clear(); - ReportCorruption(drop_size, "truncated record at end of file"); return kEof; } } @@ -284,14 +285,22 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) { if (kHeaderSize + length > buffer_.size()) { size_t drop_size = buffer_.size(); buffer_.clear(); - ReportCorruption(drop_size, "bad record length"); - return kBadRecord; + if (!eof_) { + ReportCorruption(drop_size, "bad record length"); + return kBadRecord; + } + // If the end of the file has been reached without reading |length| bytes + // of payload, assume the writer died in the middle of writing the record. + // Don't report a corruption. + return kEof; } if (type == kZeroType && length == 0) { // Skip zero length record without reporting any drops since // such records are produced by the mmap based writing code in // env_posix.cc that preallocates file regions. + // NOTE: this should never happen in DB written by new RocksDB versions, + // since we turn off mmap writes to manifest and log files buffer_.clear(); return kBadRecord; } diff --git a/db/log_test.cc b/db/log_test.cc index 6365188358..b28343e638 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -446,20 +446,32 @@ TEST(LogTest, BadRecordType) { ASSERT_EQ("OK", MatchError("unknown record type")); } -TEST(LogTest, TruncatedTrailingRecord) { +TEST(LogTest, TruncatedTrailingRecordIsIgnored) { Write("foo"); ShrinkSize(4); // Drop all payload as well as a header byte ASSERT_EQ("EOF", Read()); - ASSERT_EQ((unsigned int)(kHeaderSize - 1), DroppedBytes()); - ASSERT_EQ("OK", MatchError("truncated record at end of file")); + // Truncated last record is ignored, not treated as an error + ASSERT_EQ(0, DroppedBytes()); + ASSERT_EQ("", ReportMessage()); } TEST(LogTest, BadLength) { + const int kPayloadSize = kBlockSize - kHeaderSize; + Write(BigString("bar", kPayloadSize)); + Write("foo"); + // Least significant size byte is stored in header[4]. + IncrementByte(4, 1); + ASSERT_EQ("foo", Read()); + ASSERT_EQ(kBlockSize, DroppedBytes()); + ASSERT_EQ("OK", MatchError("bad record length")); +} + +TEST(LogTest, BadLengthAtEndIsIgnored) { Write("foo"); ShrinkSize(1); ASSERT_EQ("EOF", Read()); - ASSERT_EQ((unsigned int)(kHeaderSize + 2), DroppedBytes()); - ASSERT_EQ("OK", MatchError("bad record length")); + ASSERT_EQ(0, DroppedBytes()); + ASSERT_EQ("", ReportMessage()); } TEST(LogTest, ChecksumMismatch) { @@ -510,6 +522,24 @@ TEST(LogTest, UnexpectedFirstType) { ASSERT_EQ("OK", MatchError("partial record without end")); } +TEST(LogTest, MissingLastIsIgnored) { + Write(BigString("bar", kBlockSize)); + // Remove the LAST block, including header. + ShrinkSize(14); + ASSERT_EQ("EOF", Read()); + ASSERT_EQ("", ReportMessage()); + ASSERT_EQ(0, DroppedBytes()); +} + +TEST(LogTest, PartialLastIsIgnored) { + Write(BigString("bar", kBlockSize)); + // Cause a bad record length in the LAST block. + ShrinkSize(1); + ASSERT_EQ("EOF", Read()); + ASSERT_EQ("", ReportMessage()); + ASSERT_EQ(0, DroppedBytes()); +} + TEST(LogTest, ErrorJoinsRecords) { // Consider two fragmented records: // first(R1) last(R1) first(R2) last(R2) diff --git a/db/repair.cc b/db/repair.cc index a3f311a253..597ddd6580 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -251,7 +251,6 @@ class Repairer { } void ExtractMetaData() { - std::vector kept; for (size_t i = 0; i < table_numbers_.size(); i++) { TableInfo t; t.meta.number = table_numbers_[i]; @@ -317,7 +316,8 @@ class Repairer { Status WriteDescriptor() { std::string tmp = TempFileName(dbname_, 1); unique_ptr file; - Status status = env_->NewWritableFile(tmp, &file, storage_options_); + Status status = + env_->NewWritableFile(tmp, &file, storage_options_.AdaptForLogWrite()); if (!status.ok()) { return status; } diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index 8a9b988abe..0394855c30 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -10,10 +10,12 @@ namespace rocksdb { TransactionLogIteratorImpl::TransactionLogIteratorImpl( const std::string& dir, const DBOptions* options, + const TransactionLogIterator::ReadOptions& read_options, const EnvOptions& soptions, const SequenceNumber seq, std::unique_ptr files, DBImpl const* const dbimpl) : dir_(dir), options_(options), + read_options_(read_options), soptions_(soptions), startingSequenceNumber_(seq), files_(std::move(files)), @@ -250,9 +252,8 @@ Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* logFile) { return status; } assert(file); - currentLogReader_.reset( - new log::Reader(std::move(file), &reporter_, true, 0) - ); + currentLogReader_.reset(new log::Reader(std::move(file), &reporter_, + read_options_.verify_checksums_, 0)); return Status::OK(); } } // namespace rocksdb diff --git a/db/transaction_log_impl.h b/db/transaction_log_impl.h index 2d725bfa7e..98e4e26b4e 100644 --- a/db/transaction_log_impl.h +++ b/db/transaction_log_impl.h @@ -66,11 +66,11 @@ class LogFileImpl : public LogFile { class TransactionLogIteratorImpl : public TransactionLogIterator { public: - TransactionLogIteratorImpl(const std::string& dir, const DBOptions* options, - const EnvOptions& soptions, - const SequenceNumber seqNum, - std::unique_ptr files, - DBImpl const* const dbimpl); + TransactionLogIteratorImpl( + const std::string& dir, const DBOptions* options, + const TransactionLogIterator::ReadOptions& read_options, + const EnvOptions& soptions, const SequenceNumber seqNum, + std::unique_ptr files, DBImpl const* const dbimpl); virtual bool Valid(); @@ -83,6 +83,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { private: const std::string& dir_; const DBOptions* options_; + const TransactionLogIterator::ReadOptions read_options_; const EnvOptions& soptions_; SequenceNumber startingSequenceNumber_; std::unique_ptr files_; diff --git a/db/version_set.cc b/db/version_set.cc index 5c18163605..a906beed5b 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1585,9 +1585,8 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, // only one thread can be here at the same time if (!new_manifest_filename.empty()) { unique_ptr descriptor_file; - s = env_->NewWritableFile(new_manifest_filename, - &descriptor_file, - storage_options_); + s = env_->NewWritableFile(new_manifest_filename, &descriptor_file, + storage_options_.AdaptForLogWrite()); if (s.ok()) { descriptor_log_.reset(new log::Writer(std::move(descriptor_file))); s = WriteSnapshot(descriptor_log_.get()); @@ -2615,7 +2614,6 @@ ColumnFamilyData* VersionSet::CreateColumnFamily( AppendVersion(new_cfd, new Version(new_cfd, this, current_version_number_++)); new_cfd->CreateNewMemtable(); new_cfd->SetLogNumber(edit->log_number_); - delete new_cfd->InstallSuperVersion(new SuperVersion()); return new_cfd; } diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 7bd0c7c8a4..34fbb065d2 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -420,8 +420,10 @@ class DB { // use this api, else the WAL files will get // cleared aggressively and the iterator might keep getting invalid before // an update is read. - virtual Status GetUpdatesSince(SequenceNumber seq_number, - unique_ptr* iter) = 0; + virtual Status GetUpdatesSince( + SequenceNumber seq_number, unique_ptr* iter, + const TransactionLogIterator::ReadOptions& + read_options = TransactionLogIterator::ReadOptions()) = 0; // Delete the file name from the db directory and update the internal state to // reflect that. Supports deletion of sst and log files only. 'name' must be diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index f5aac92e7b..c6e836957c 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -49,6 +49,8 @@ struct EnvOptions { // construct from Options explicit EnvOptions(const DBOptions& options); + EnvOptions AdaptForLogWrite() const; + // If true, then allow caching of data in environment buffers bool use_os_buffer = true; @@ -511,25 +513,56 @@ class Directory { virtual Status Fsync() = 0; }; +enum InfoLogLevel { + DEBUG = 0, + INFO, + WARN, + ERROR, + FATAL, + NUM_INFO_LOG_LEVELS, +}; + // An interface for writing log messages. class Logger { public: enum { DO_NOT_SUPPORT_GET_LOG_FILE_SIZE = -1 }; - Logger() { } + explicit Logger(const InfoLogLevel log_level = InfoLogLevel::ERROR) + : log_level_(log_level) {} virtual ~Logger(); // Write an entry to the log file with the specified format. virtual void Logv(const char* format, va_list ap) = 0; + + // Write an entry to the log file with the specified log level + // and format. Any log with level under the internal log level + // of *this (see @SetInfoLogLevel and @GetInfoLogLevel) will not be + // printed. + void Logv(const InfoLogLevel log_level, const char* format, va_list ap) { + static const char* kInfoLogLevelNames[5] = {"DEBUG", "INFO", "WARN", + "ERROR", "FATAL"}; + if (log_level < log_level_) { + return; + } + char new_format[500]; + snprintf(new_format, sizeof(new_format) - 1, "[%s] %s", + kInfoLogLevelNames[log_level], format); + Logv(new_format, ap); + } virtual size_t GetLogFileSize() const { return DO_NOT_SUPPORT_GET_LOG_FILE_SIZE; } // Flush to the OS buffers virtual void Flush() {} + virtual InfoLogLevel GetInfoLogLevel() const { return log_level_; } + virtual void SetInfoLogLevel(const InfoLogLevel log_level) { + log_level_ = log_level; + } private: // No copying allowed Logger(const Logger&); void operator=(const Logger&); + InfoLogLevel log_level_; }; @@ -547,7 +580,18 @@ class FileLock { extern void LogFlush(const shared_ptr& info_log); +extern void Log(const InfoLogLevel log_level, + const shared_ptr& info_log, const char* format, ...); + +// a set of log functions with different log levels. +extern void Debug(const shared_ptr& info_log, const char* format, ...); +extern void Info(const shared_ptr& info_log, const char* format, ...); +extern void Warn(const shared_ptr& info_log, const char* format, ...); +extern void Error(const shared_ptr& info_log, const char* format, ...); +extern void Fatal(const shared_ptr& info_log, const char* format, ...); + // Log the specified data to *info_log if info_log is non-nullptr. +// The default info log level is InfoLogLevel::ERROR. extern void Log(const shared_ptr& info_log, const char* format, ...) # if defined(__GNUC__) || defined(__clang__) __attribute__((__format__ (__printf__, 2, 3))) @@ -556,12 +600,23 @@ extern void Log(const shared_ptr& info_log, const char* format, ...) extern void LogFlush(Logger *info_log); +extern void Log(const InfoLogLevel log_level, Logger* info_log, + const char* format, ...); + +// The default info log level is InfoLogLevel::ERROR. extern void Log(Logger* info_log, const char* format, ...) # if defined(__GNUC__) || defined(__clang__) __attribute__((__format__ (__printf__, 2, 3))) # endif ; +// a set of log functions with different log levels. +extern void Debug(Logger* info_log, const char* format, ...); +extern void Info(Logger* info_log, const char* format, ...); +extern void Warn(Logger* info_log, const char* format, ...); +extern void Error(Logger* info_log, const char* format, ...); +extern void Fatal(Logger* info_log, const char* format, ...); + // A utility routine: write "data" to the named file. extern Status WriteStringToFile(Env* env, const Slice& data, const std::string& fname); diff --git a/include/rocksdb/flush_block_policy.h b/include/rocksdb/flush_block_policy.h index 1740d879c6..8340ad616e 100644 --- a/include/rocksdb/flush_block_policy.h +++ b/include/rocksdb/flush_block_policy.h @@ -11,6 +11,7 @@ namespace rocksdb { class Slice; class BlockBuilder; +struct Options; // FlushBlockPolicy provides a configurable way to determine when to flush a // block in the block based tables, @@ -36,29 +37,22 @@ class FlushBlockPolicyFactory { // Callers must delete the result after any database that is using the // result has been closed. virtual FlushBlockPolicy* NewFlushBlockPolicy( - const BlockBuilder& data_block_builder) const = 0; + const Options& options, const BlockBuilder& data_block_builder) const = 0; virtual ~FlushBlockPolicyFactory() { } }; class FlushBlockBySizePolicyFactory : public FlushBlockPolicyFactory { public: - FlushBlockBySizePolicyFactory(const uint64_t block_size, - const uint64_t block_size_deviation) : - block_size_(block_size), - block_size_deviation_(block_size_deviation) { - } + FlushBlockBySizePolicyFactory() {} virtual const char* Name() const override { return "FlushBlockBySizePolicyFactory"; } virtual FlushBlockPolicy* NewFlushBlockPolicy( + const Options& options, const BlockBuilder& data_block_builder) const override; - - private: - const uint64_t block_size_; - const uint64_t block_size_deviation_; }; } // rocksdb diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 2caba3cc67..3bbfc2f6a9 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -717,6 +717,10 @@ struct DBOptions { // Default: 0 uint64_t bytes_per_sync; + // Allow RocksDB to use thread local storage to optimize performance. + // Default: true + bool allow_thread_local; + // Create DBOptions with default values for all fields DBOptions(); // Create DBOptions from Options diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index 24384e9ce5..82cc7133fd 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -122,6 +122,7 @@ enum Tickers { // Number of table's properties loaded directly from file, without creating // table reader object. NUMBER_DIRECT_LOAD_TABLE_PROPERTIES, + NUMBER_SUPERVERSION_UPDATES, TICKER_ENUM_MAX }; @@ -176,7 +177,9 @@ const std::vector> TickersNameMap = { {COMPACT_READ_BYTES, "rocksdb.compact.read.bytes"}, {COMPACT_WRITE_BYTES, "rocksdb.compact.write.bytes"}, {NUMBER_DIRECT_LOAD_TABLE_PROPERTIES, - "rocksdb.number.direct.load.table.properties"}, }; + "rocksdb.number.direct.load.table.properties"}, + {NUMBER_SUPERVERSION_UPDATES, "rocksdb.number.superversion_updates"}, +}; /** * Keep adding histogram's here. diff --git a/include/rocksdb/table.h b/include/rocksdb/table.h index 5c04257ffc..e350c77807 100644 --- a/include/rocksdb/table.h +++ b/include/rocksdb/table.h @@ -54,6 +54,21 @@ struct BlockBasedTableOptions { // If not specified, each "table reader" object will pre-load index/filter // block during table initialization. bool cache_index_and_filter_blocks = false; + + // The index type that will be used for this table. + enum IndexType : char { + // A space efficient index block that is optimized for + // binary-search-based index. + kBinarySearch, + }; + + IndexType index_type = kBinarySearch; +}; + +// Table Properties that are specific to block-based table properties. +struct BlockBasedTablePropertyNames { + // value of this propertis is a fixed int32 number. + static const std::string kIndexType; }; // Create default block based table factory. diff --git a/include/rocksdb/transaction_log.h b/include/rocksdb/transaction_log.h index 41a3250d8d..30443bba55 100644 --- a/include/rocksdb/transaction_log.h +++ b/include/rocksdb/transaction_log.h @@ -85,6 +85,19 @@ class TransactionLogIterator { // earliest transaction contained in the batch. // ONLY use if Valid() is true and status() is OK. virtual BatchResult GetBatch() = 0; + + // The read options for TransactionLogIterator. + struct ReadOptions { + // If true, all data read from underlying storage will be + // verified against corresponding checksums. + // Default: true + bool verify_checksums_; + + ReadOptions() : verify_checksums_(true) {} + + explicit ReadOptions(bool verify_checksums) + : verify_checksums_(verify_checksums) {} + }; }; } // namespace rocksdb diff --git a/include/utilities/stackable_db.h b/include/utilities/stackable_db.h index abb2517ce0..4d56890f55 100644 --- a/include/utilities/stackable_db.h +++ b/include/utilities/stackable_db.h @@ -188,10 +188,10 @@ class StackableDB : public DB { return db_->GetPropertiesOfAllTables(column_family, props); } - virtual Status GetUpdatesSince(SequenceNumber seq_number, - unique_ptr* iter) - override { - return db_->GetUpdatesSince(seq_number, iter); + virtual Status GetUpdatesSince( + SequenceNumber seq_number, unique_ptr* iter, + const TransactionLogIterator::ReadOptions& read_options) override { + return db_->GetUpdatesSince(seq_number, iter, read_options); } virtual ColumnFamilyHandle* DefaultColumnFamily() const override { diff --git a/port/likely.h b/port/likely.h new file mode 100644 index 0000000000..ede0df5a15 --- /dev/null +++ b/port/likely.h @@ -0,0 +1,21 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef PORT_LIKELY_H_ +#define PORT_LIKELY_H_ + +#if defined(__GNUC__) && __GNUC__ >= 4 +#define LIKELY(x) (__builtin_expect((x), 1)) +#define UNLIKELY(x) (__builtin_expect((x), 0)) +#else +#define LIKELY(x) (x) +#define UNLIKELY(x) (x) +#endif + +#endif // PORT_LIKELY_H_ diff --git a/table/block.h b/table/block.h index 7fac006572..6d74bb417a 100644 --- a/table/block.h +++ b/table/block.h @@ -26,8 +26,8 @@ class Block { ~Block(); size_t size() const { return size_; } - bool isCachable() const { return cachable_; } - CompressionType compressionType() const { return compression_type_; } + bool cachable() const { return cachable_; } + CompressionType compression_type() const { return compression_type_; } Iterator* NewIterator(const Comparator* comparator); const char* data() { return data_; } diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index 75f204670e..de2466605e 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -11,23 +11,29 @@ #include #include -#include #include -#include "rocksdb/flush_block_policy.h" +#include +#include + +#include "db/dbformat.h" + #include "rocksdb/cache.h" #include "rocksdb/comparator.h" -#include "table/table_builder.h" #include "rocksdb/env.h" #include "rocksdb/filter_policy.h" +#include "rocksdb/flush_block_policy.h" #include "rocksdb/options.h" -#include "db/dbformat.h" -#include "table/block_based_table_reader.h" +#include "rocksdb/table.h" + #include "table/block.h" +#include "table/block_based_table_reader.h" #include "table/block_builder.h" #include "table/filter_block.h" #include "table/format.h" #include "table/meta_blocks.h" +#include "table/table_builder.h" + #include "util/coding.h" #include "util/crc32c.h" #include "util/stop_watch.h" @@ -36,11 +42,167 @@ namespace rocksdb { namespace { -static bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) { +typedef BlockBasedTableOptions::IndexType IndexType; + +// The interface for building index. +// Instruction for adding a new concrete IndexBuilder: +// 1. Create a subclass instantiated from IndexBuilder. +// 2. Add a new entry associated with that subclass in TableOptions::IndexType. +// 3. Add a create function for the new subclass in CreateIndexBuilder. +// Note: we can devise more advanced design to simplify the process for adding +// new subclass, which will, on the other hand, increase the code complexity and +// catch unwanted attention from readers. Given that we won't add/change +// indexes frequently, it makes sense to just embrace a more straightforward +// design that just works. +class IndexBuilder { + public: + explicit IndexBuilder(const Comparator* comparator) + : comparator_(comparator) {} + + virtual ~IndexBuilder() {} + + // Add a new index entry to index block. + // To allow further optimization, we provide `last_key_in_current_block` and + // `first_key_in_next_block`, based on which the specific implementation can + // determine the best index key to be used for the index block. + // @last_key_in_current_block: this parameter maybe overridden with the value + // "substitute key". + // @first_key_in_next_block: it will be nullptr if the entry being added is + // the last one in the table + // + // REQUIRES: Finish() has not yet been called. + virtual void AddEntry(std::string* last_key_in_current_block, + const Slice* first_key_in_next_block, + const BlockHandle& block_handle) = 0; + + // Inform the index builder that all entries has been written. Block builder + // may therefore perform any operation required for block finalization. + // + // REQUIRES: Finish() has not yet been called. + virtual Slice Finish() = 0; + + // Get the estimated size for index block. + virtual size_t EstimatedSize() const = 0; + + protected: + const Comparator* comparator_; +}; + +// This index builder builds space-efficient index block for binary-search-based +// index. +// +// Optimizations: +// 1. Made block's `block_restart_interval` to be 1, which will avoid linear +// search when doing index lookup. +// 2. Shorten the key length for index block. Other than honestly using the +// last key in the data block as the index key, we instead find a shortest +// substitute key that serves the same function. +class BinarySearchIndexBuilder : public IndexBuilder { + public: + explicit BinarySearchIndexBuilder(const Comparator* comparator) + : IndexBuilder(comparator), + index_block_builder_(1 /* block_restart_interval == 1 */, comparator) {} + + virtual void AddEntry(std::string* last_key_in_current_block, + const Slice* first_key_in_next_block, + const BlockHandle& block_handle) override { + if (first_key_in_next_block != nullptr) { + comparator_->FindShortestSeparator(last_key_in_current_block, + *first_key_in_next_block); + } else { + comparator_->FindShortSuccessor(last_key_in_current_block); + } + + std::string handle_encoding; + block_handle.EncodeTo(&handle_encoding); + index_block_builder_.Add(*last_key_in_current_block, handle_encoding); + } + + virtual Slice Finish() override { return index_block_builder_.Finish(); } + + virtual size_t EstimatedSize() const { + return index_block_builder_.CurrentSizeEstimate(); + } + + private: + BlockBuilder index_block_builder_; +}; + +// Create a index builder based on its type. +IndexBuilder* CreateIndexBuilder(IndexType type, const Comparator* comparator) { + switch (type) { + case BlockBasedTableOptions::kBinarySearch: { + return new BinarySearchIndexBuilder(comparator); + } + default: { + assert(!"Do not recognize the index type "); + return nullptr; + } + } + // impossible. + assert(false); + return nullptr; +} + +bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) { // Check to see if compressed less than 12.5% return compressed_size < raw_size - (raw_size / 8u); } +Slice CompressBlock(const Slice& raw, + const CompressionOptions& compression_options, + CompressionType* type, std::string* compressed_output) { + if (*type == kNoCompression) { + return raw; + } + + // Will return compressed block contents if (1) the compression method is + // supported in this platform and (2) the compression rate is "good enough". + switch (*type) { + case kSnappyCompression: + if (port::Snappy_Compress(compression_options, raw.data(), raw.size(), + compressed_output) && + GoodCompressionRatio(compressed_output->size(), raw.size())) { + return *compressed_output; + } + break; // fall back to no compression. + case kZlibCompression: + if (port::Zlib_Compress(compression_options, raw.data(), raw.size(), + compressed_output) && + GoodCompressionRatio(compressed_output->size(), raw.size())) { + return *compressed_output; + } + break; // fall back to no compression. + case kBZip2Compression: + if (port::BZip2_Compress(compression_options, raw.data(), raw.size(), + compressed_output) && + GoodCompressionRatio(compressed_output->size(), raw.size())) { + return *compressed_output; + } + break; // fall back to no compression. + case kLZ4Compression: + if (port::LZ4_Compress(compression_options, raw.data(), raw.size(), + compressed_output) && + GoodCompressionRatio(compressed_output->size(), raw.size())) { + return *compressed_output; + } + break; // fall back to no compression. + case kLZ4HCCompression: + if (port::LZ4HC_Compress(compression_options, raw.data(), raw.size(), + compressed_output) && + GoodCompressionRatio(compressed_output->size(), raw.size())) { + return *compressed_output; + } + break; // fall back to no compression. + default: {} // Do not recognize this compression type + } + + // Compression method is not supported, or not good compression ratio, so just + // fall back to uncompressed form. + *type = kNoCompression; + return raw; +} + } // anonymous namespace // kBlockBasedTableMagicNumber was picked by running @@ -51,6 +213,46 @@ static bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) { extern const uint64_t kBlockBasedTableMagicNumber = 0xdb4775248b80fb57ull; +// A collector that collects properties of interest to block-based table. +// For now this class looks heavy-weight since we only write one additional +// property. +// But in the forseeable future, we will add more and more properties that are +// specific to block-based table. +class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector + : public TablePropertiesCollector { + public: + BlockBasedTablePropertiesCollector( + BlockBasedTableOptions::IndexType index_type) + : index_type_(index_type) {} + + virtual Status Add(const Slice& key, const Slice& value) { + // Intentionally left blank. Have no interest in collecting stats for + // individual key/value pairs. + return Status::OK(); + } + + virtual Status Finish(UserCollectedProperties* properties) { + std::string val; + PutFixed32(&val, static_cast(index_type_)); + properties->insert({BlockBasedTablePropertyNames::kIndexType, val}); + + return Status::OK(); + } + + // The name of the properties collector can be used for debugging purpose. + virtual const char* Name() const { + return "BlockBasedTablePropertiesCollector"; + } + + virtual UserCollectedProperties GetReadableProperties() const { + // Intentionally left blank. + return UserCollectedProperties(); + } + + private: + BlockBasedTableOptions::IndexType index_type_; +}; + struct BlockBasedTableBuilder::Rep { Options options; const InternalKeyComparator& internal_comparator; @@ -58,7 +260,8 @@ struct BlockBasedTableBuilder::Rep { uint64_t offset = 0; Status status; BlockBuilder data_block; - BlockBuilder index_block; + std::unique_ptr index_builder; + std::string last_key; CompressionType compression_type; TableProperties props; @@ -75,28 +278,31 @@ struct BlockBasedTableBuilder::Rep { Rep(const Options& opt, const InternalKeyComparator& icomparator, WritableFile* f, FlushBlockPolicyFactory* flush_block_policy_factory, - CompressionType compression_type) + CompressionType compression_type, IndexType index_block_type) : options(opt), internal_comparator(icomparator), file(f), data_block(options, &internal_comparator), - // To avoid linear scan, we make the block_restart_interval to be `1` - // in index block builder - index_block(1 /* block_restart_interval */, &internal_comparator), + index_builder( + CreateIndexBuilder(index_block_type, &internal_comparator)), compression_type(compression_type), filter_block(opt.filter_policy == nullptr ? nullptr : new FilterBlockBuilder(opt, &internal_comparator)), - flush_block_policy( - flush_block_policy_factory->NewFlushBlockPolicy(data_block)) {} + flush_block_policy(flush_block_policy_factory->NewFlushBlockPolicy( + options, data_block)) { + options.table_properties_collectors.push_back( + std::make_shared(index_block_type)); + } }; BlockBasedTableBuilder::BlockBasedTableBuilder( - const Options& options, const InternalKeyComparator& internal_comparator, - WritableFile* file, FlushBlockPolicyFactory* flush_block_policy_factory, + const Options& options, const BlockBasedTableOptions& table_options, + const InternalKeyComparator& internal_comparator, WritableFile* file, CompressionType compression_type) : rep_(new Rep(options, internal_comparator, file, - flush_block_policy_factory, compression_type)) { + table_options.flush_block_policy_factory.get(), + compression_type, table_options.index_type)) { if (rep_->filter_block != nullptr) { rep_->filter_block->StartBlock(0); } @@ -136,10 +342,7 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) { // entries in the first block and < all entries in subsequent // blocks. if (ok()) { - r->internal_comparator.FindShortestSeparator(&r->last_key, key); - std::string handle_encoding; - r->pending_handle.EncodeTo(&handle_encoding); - r->index_block.Add(r->last_key, Slice(handle_encoding)); + r->index_builder->AddEntry(&r->last_key, &key, r->pending_handle); } } @@ -179,88 +382,25 @@ void BlockBasedTableBuilder::Flush() { void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) { + WriteBlock(block->Finish(), handle); + block->Reset(); +} + +void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, + BlockHandle* handle) { // File format contains a sequence of blocks where each block has: // block_data: uint8[n] // type: uint8 // crc: uint32 assert(ok()); Rep* r = rep_; - Slice raw = block->Finish(); - Slice block_contents; - std::string* compressed = &r->compressed_output; - CompressionType type = r->compression_type; - switch (type) { - case kNoCompression: - block_contents = raw; - break; - - case kSnappyCompression: { - std::string* compressed = &r->compressed_output; - if (port::Snappy_Compress(r->options.compression_opts, raw.data(), - raw.size(), compressed) && - GoodCompressionRatio(compressed->size(), raw.size())) { - block_contents = *compressed; - } else { - // Snappy not supported, or not good compression ratio, so just - // store uncompressed form - block_contents = raw; - type = kNoCompression; - } - break; - } - case kZlibCompression: - if (port::Zlib_Compress(r->options.compression_opts, raw.data(), - raw.size(), compressed) && - GoodCompressionRatio(compressed->size(), raw.size())) { - block_contents = *compressed; - } else { - // Zlib not supported, or not good compression ratio, so just - // store uncompressed form - block_contents = raw; - type = kNoCompression; - } - break; - case kBZip2Compression: - if (port::BZip2_Compress(r->options.compression_opts, raw.data(), - raw.size(), compressed) && - GoodCompressionRatio(compressed->size(), raw.size())) { - block_contents = *compressed; - } else { - // BZip not supported, or not good compression ratio, so just - // store uncompressed form - block_contents = raw; - type = kNoCompression; - } - break; - case kLZ4Compression: - if (port::LZ4_Compress(r->options.compression_opts, raw.data(), - raw.size(), compressed) && - GoodCompressionRatio(compressed->size(), raw.size())) { - block_contents = *compressed; - } else { - // LZ4 not supported, or not good compression ratio, so just - // store uncompressed form - block_contents = raw; - type = kNoCompression; - } - break; - case kLZ4HCCompression: - if (port::LZ4HC_Compress(r->options.compression_opts, raw.data(), - raw.size(), compressed) && - GoodCompressionRatio(compressed->size(), raw.size())) { - block_contents = *compressed; - } else { - // LZ4 not supported, or not good compression ratio, so just - // store uncompressed form - block_contents = raw; - type = kNoCompression; - } - break; - } + auto type = r->compression_type; + auto block_contents = + CompressBlock(raw_block_contents, r->options.compression_opts, &type, + &r->compressed_output); WriteRawBlock(block_contents, type, handle); r->compressed_output.clear(); - block->Reset(); } void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents, @@ -364,11 +504,8 @@ Status BlockBasedTableBuilder::Finish() { // block, we will finish writing all index entries here and flush them // to storage after metaindex block is written. if (ok() && !empty_data_block) { - r->internal_comparator.FindShortSuccessor(&r->last_key); - - std::string handle_encoding; - r->pending_handle.EncodeTo(&handle_encoding); - r->index_block.Add(r->last_key, handle_encoding); + r->index_builder->AddEntry(&r->last_key, nullptr /* no next data block */, + r->pending_handle); } // Write meta blocks and metaindex block with the following order. @@ -394,11 +531,12 @@ Status BlockBasedTableBuilder::Finish() { r->props.filter_policy_name = r->options.filter_policy != nullptr ? r->options.filter_policy->Name() : ""; r->props.index_size = - r->index_block.CurrentSizeEstimate() + kBlockTrailerSize; + r->index_builder->EstimatedSize() + kBlockTrailerSize; // Add basic properties property_block_builder.AddTableProperty(r->props); + // Add use collected properties NotifyCollectTableCollectorsOnFinish( r->options.table_properties_collectors, r->options.info_log.get(), @@ -425,7 +563,7 @@ Status BlockBasedTableBuilder::Finish() { // Write index block if (ok()) { - WriteBlock(&r->index_block, &index_block_handle); + WriteBlock(r->index_builder->Finish(), &index_block_handle); } // Write footer diff --git a/table/block_based_table_builder.h b/table/block_based_table_builder.h index 1c4be1f830..5871427c6e 100644 --- a/table/block_based_table_builder.h +++ b/table/block_based_table_builder.h @@ -9,6 +9,7 @@ #pragma once #include + #include "rocksdb/flush_block_policy.h" #include "rocksdb/options.h" #include "rocksdb/status.h" @@ -19,6 +20,7 @@ namespace rocksdb { class BlockBuilder; class BlockHandle; class WritableFile; +struct BlockBasedTableOptions; class BlockBasedTableBuilder : public TableBuilder { public: @@ -26,10 +28,9 @@ class BlockBasedTableBuilder : public TableBuilder { // building in *file. Does not close the file. It is up to the // caller to close the file after calling Finish(). BlockBasedTableBuilder(const Options& options, + const BlockBasedTableOptions& table_options, const InternalKeyComparator& internal_comparator, - WritableFile* file, - FlushBlockPolicyFactory* flush_block_policy_factory, - CompressionType compression_type); + WritableFile* file, CompressionType compression_type); // REQUIRES: Either Finish() or Abandon() has been called. ~BlockBasedTableBuilder(); @@ -63,11 +64,17 @@ class BlockBasedTableBuilder : public TableBuilder { private: bool ok() const { return status().ok(); } + // Call block's Finish() method and then write the finalize block contents to + // file. void WriteBlock(BlockBuilder* block, BlockHandle* handle); + // Directly write block content to the file. + void WriteBlock(const Slice& block_contents, BlockHandle* handle); void WriteRawBlock(const Slice& data, CompressionType, BlockHandle* handle); Status InsertBlockInCache(const Slice& block_contents, - const CompressionType type, const BlockHandle* handle); + const CompressionType type, + const BlockHandle* handle); struct Rep; + class BlockBasedTablePropertiesCollector; Rep* rep_; // Advanced operation: flush any buffered key/value pairs to file. @@ -82,4 +89,3 @@ class BlockBasedTableBuilder : public TableBuilder { }; } // namespace rocksdb - diff --git a/table/block_based_table_factory.cc b/table/block_based_table_factory.cc index 6a4a644629..822adee226 100644 --- a/table/block_based_table_factory.cc +++ b/table/block_based_table_factory.cc @@ -11,13 +11,25 @@ #include "table/block_based_table_factory.h" #include +#include #include + +#include "rocksdb/flush_block_policy.h" #include "table/block_based_table_builder.h" #include "table/block_based_table_reader.h" #include "port/port.h" namespace rocksdb { +BlockBasedTableFactory::BlockBasedTableFactory( + const BlockBasedTableOptions& table_options) + : table_options_(table_options) { + if (table_options_.flush_block_policy_factory == nullptr) { + table_options_.flush_block_policy_factory.reset( + new FlushBlockBySizePolicyFactory()); + } +} + Status BlockBasedTableFactory::NewTableReader( const Options& options, const EnvOptions& soptions, const InternalKeyComparator& internal_comparator, @@ -31,34 +43,8 @@ Status BlockBasedTableFactory::NewTableReader( TableBuilder* BlockBasedTableFactory::NewTableBuilder( const Options& options, const InternalKeyComparator& internal_comparator, WritableFile* file, CompressionType compression_type) const { - auto flush_block_policy_factory = - table_options_.flush_block_policy_factory.get(); - - // if flush block policy factory is not set, we'll create the default one - // from the options. - // - // NOTE: we cannot pre-cache the "default block policy factory" because - // `FlushBlockBySizePolicyFactory` takes `options.block_size` and - // `options.block_size_deviation` as parameters, which may be different - // every time. - if (flush_block_policy_factory == nullptr) { - flush_block_policy_factory = - new FlushBlockBySizePolicyFactory(options.block_size, - options.block_size_deviation); - } - - auto table_builder = - new BlockBasedTableBuilder(options, internal_comparator, file, - flush_block_policy_factory, compression_type); - - // Delete flush_block_policy_factory only when it's just created from the - // options. - // We can safely delete flush_block_policy_factory since it will only be used - // during the construction of `BlockBasedTableBuilder`. - if (flush_block_policy_factory != - table_options_.flush_block_policy_factory.get()) { - delete flush_block_policy_factory; - } + auto table_builder = new BlockBasedTableBuilder( + options, table_options_, internal_comparator, file, compression_type); return table_builder; } @@ -68,4 +54,7 @@ TableFactory* NewBlockBasedTableFactory( return new BlockBasedTableFactory(table_options); } +const std::string BlockBasedTablePropertyNames::kIndexType = + "rocksdb.block.based.table.index.type"; + } // namespace rocksdb diff --git a/table/block_based_table_factory.h b/table/block_based_table_factory.h index 5569970650..492349c558 100644 --- a/table/block_based_table_factory.h +++ b/table/block_based_table_factory.h @@ -26,8 +26,7 @@ class BlockBasedTableBuilder; class BlockBasedTableFactory : public TableFactory { public: explicit BlockBasedTableFactory( - const BlockBasedTableOptions& table_options = BlockBasedTableOptions()) - : table_options_(table_options) {} + const BlockBasedTableOptions& table_options = BlockBasedTableOptions()); ~BlockBasedTableFactory() {} diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index 4921d28f4d..c3adf3ac53 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -9,11 +9,16 @@ #include "table/block_based_table_reader.h" +#include +#include + #include "db/dbformat.h" +#include "rocksdb/cache.h" #include "rocksdb/comparator.h" #include "rocksdb/env.h" #include "rocksdb/filter_policy.h" +#include "rocksdb/iterator.h" #include "rocksdb/options.h" #include "rocksdb/statistics.h" #include "rocksdb/table.h" @@ -31,21 +36,172 @@ namespace rocksdb { extern uint64_t kBlockBasedTableMagicNumber; +using std::unique_ptr; + +typedef BlockBasedTable::IndexReader IndexReader; + +namespace { // The longest the prefix of the cache key used to identify blocks can be. // We are using the fact that we know for Posix files the unique ID is three // varints. const size_t kMaxCacheKeyPrefixSize = kMaxVarint64Length*3+1; -using std::unique_ptr; + +// Read the block identified by "handle" from "file". +// The only relevant option is options.verify_checksums for now. +// Set *didIO to true if didIO is not null. +// On failure return non-OK. +// On success fill *result and return OK - caller owns *result +Status ReadBlockFromFile(RandomAccessFile* file, const ReadOptions& options, + const BlockHandle& handle, Block** result, Env* env, + bool* didIO = nullptr, bool do_uncompress = true) { + BlockContents contents; + Status s = + ReadBlockContents(file, options, handle, &contents, env, do_uncompress); + if (s.ok()) { + *result = new Block(contents); + } + + if (didIO != nullptr) { + *didIO = true; + } + return s; +} + +// Delete the resource that is held by the iterator. +template +void DeleteHeldResource(void* arg, void* ignored) { + delete reinterpret_cast(arg); +} + +// Delete the entry resided in the cache. +template +void DeleteCachedEntry(const Slice& key, void* value) { + auto entry = reinterpret_cast(value); + delete entry; +} + +// Release the cached entry and decrement its ref count. +void ReleaseCachedEntry(void* arg, void* h) { + Cache* cache = reinterpret_cast(arg); + Cache::Handle* handle = reinterpret_cast(h); + cache->Release(handle); +} + +Slice GetCacheKey(const char* cache_key_prefix, size_t cache_key_prefix_size, + const BlockHandle& handle, char* cache_key) { + assert(cache_key != nullptr); + assert(cache_key_prefix_size != 0); + assert(cache_key_prefix_size <= kMaxCacheKeyPrefixSize); + memcpy(cache_key, cache_key_prefix, cache_key_prefix_size); + char* end = + EncodeVarint64(cache_key + cache_key_prefix_size, handle.offset()); + return Slice(cache_key, static_cast(end - cache_key)); +} + +Cache::Handle* GetEntryFromCache(Cache* block_cache, const Slice& key, + Tickers block_cache_miss_ticker, + Tickers block_cache_hit_ticker, + Statistics* statistics) { + auto cache_handle = block_cache->Lookup(key); + if (cache_handle != nullptr) { + BumpPerfCount(&perf_context.block_cache_hit_count); + // overall cache hit + RecordTick(statistics, BLOCK_CACHE_HIT); + // block-type specific cache hit + RecordTick(statistics, block_cache_hit_ticker); + } else { + // overall cache miss + RecordTick(statistics, BLOCK_CACHE_MISS); + // block-type specific cache miss + RecordTick(statistics, block_cache_miss_ticker); + } + + return cache_handle; +} + +} // namespace + +// -- IndexReader and its subclasses +// IndexReader is the interface that provide the functionality for index access. +class BlockBasedTable::IndexReader { + public: + explicit IndexReader(const Comparator* comparator) + : comparator_(comparator) {} + + virtual ~IndexReader() {} + + // Create an iterator for index access. + virtual Iterator* NewIterator() = 0; + + // The size of the index. + virtual size_t size() const = 0; + + protected: + const Comparator* comparator_; +}; + +// Index that allows binary search lookup for the first key of each block. +// This class can be viewed as a thin wrapper for `Block` class which already +// supports binary search. +class BinarySearchIndexReader : public IndexReader { + public: + // Read index from the file and create an intance for + // `BinarySearchIndexReader`. + // On success, index_reader will be populated; otherwise it will remain + // unmodified. + static Status Create(RandomAccessFile* file, const BlockHandle& index_handle, + Env* env, const Comparator* comparator, + IndexReader** index_reader) { + Block* index_block = nullptr; + auto s = ReadBlockFromFile(file, ReadOptions(), index_handle, + &index_block, env); + + if (s.ok()) { + *index_reader = new BinarySearchIndexReader(comparator, index_block); + } + + return s; + } + + virtual Iterator* NewIterator() override { + return index_block_->NewIterator(comparator_); + } + + virtual size_t size() const override { return index_block_->size(); } + + private: + BinarySearchIndexReader(const Comparator* comparator, Block* index_block) + : IndexReader(comparator), index_block_(index_block) { + assert(index_block_ != nullptr); + } + std::unique_ptr index_block_; +}; + +// TODO(kailiu) This class is only a stub for now. And the comment below is also +// not completed. +// Index that leverages an internal hash table to quicken the lookup for a given +// key. +class HashIndexReader : public IndexReader { + public: + static Status Create(RandomAccessFile* file, const BlockHandle& index_handle, + Env* env, const Comparator* comparator, + BlockBasedTable* table, + const SliceTransform* prefix_extractor, + IndexReader** index_reader) { + return Status::NotSupported("not implemented yet!"); + } +}; + struct BlockBasedTable::Rep { Rep(const EnvOptions& storage_options, const InternalKeyComparator& internal_comparator) - : soptions(storage_options), internal_comparator_(internal_comparator) {} + : soptions(storage_options), internal_comparator(internal_comparator) {} Options options; const EnvOptions& soptions; - const InternalKeyComparator& internal_comparator_; + const InternalKeyComparator& internal_comparator; Status status; unique_ptr file; char cache_key_prefix[kMaxCacheKeyPrefixSize]; @@ -57,12 +213,14 @@ struct BlockBasedTable::Rep { BlockHandle metaindex_handle; // Handle to index: saved from footer BlockHandle index_handle; - // index_block will be populated and used only when options.block_cache is - // NULL; otherwise we will get the index block via the block cache. - unique_ptr index_block; + // index_reader and filter will be populated and used only when + // options.block_cache is nullptr; otherwise we will get the index block via + // the block cache. + unique_ptr index_reader; unique_ptr filter; std::shared_ptr table_properties; + BlockBasedTableOptions::IndexType index_type; }; BlockBasedTable::~BlockBasedTable() { @@ -138,92 +296,6 @@ void BlockBasedTable::GenerateCachePrefix(Cache* cc, } } -namespace { // anonymous namespace, not visible externally - -// Read the block identified by "handle" from "file". -// The only relevant option is options.verify_checksums for now. -// Set *didIO to true if didIO is not null. -// On failure return non-OK. -// On success fill *result and return OK - caller owns *result -Status ReadBlockFromFile( - RandomAccessFile* file, - const ReadOptions& options, - const BlockHandle& handle, - Block** result, - Env* env, - bool* didIO = nullptr, - bool do_uncompress = true) { - BlockContents contents; - Status s = ReadBlockContents(file, options, handle, &contents, - env, do_uncompress); - if (s.ok()) { - *result = new Block(contents); - } - - if (didIO) { - *didIO = true; - } - return s; -} - -void DeleteBlock(void* arg, void* ignored) { - delete reinterpret_cast(arg); -} - -void DeleteCachedBlock(const Slice& key, void* value) { - Block* block = reinterpret_cast(value); - delete block; -} - -void DeleteCachedFilter(const Slice& key, void* value) { - auto filter = reinterpret_cast(value); - delete filter; -} - -void ReleaseBlock(void* arg, void* h) { - Cache* cache = reinterpret_cast(arg); - Cache::Handle* handle = reinterpret_cast(h); - cache->Release(handle); -} - -Slice GetCacheKey(const char* cache_key_prefix, - size_t cache_key_prefix_size, - const BlockHandle& handle, - char* cache_key) { - assert(cache_key != nullptr); - assert(cache_key_prefix_size != 0); - assert(cache_key_prefix_size <= kMaxCacheKeyPrefixSize); - memcpy(cache_key, cache_key_prefix, cache_key_prefix_size); - char* end = EncodeVarint64(cache_key + cache_key_prefix_size, - handle.offset()); - return Slice(cache_key, static_cast(end - cache_key)); -} - -Cache::Handle* GetFromBlockCache( - Cache* block_cache, - const Slice& key, - Tickers block_cache_miss_ticker, - Tickers block_cache_hit_ticker, - Statistics* statistics) { - auto cache_handle = block_cache->Lookup(key); - if (cache_handle != nullptr) { - BumpPerfCount(&perf_context.block_cache_hit_count); - // overall cache hit - RecordTick(statistics, BLOCK_CACHE_HIT); - // block-type specific cache hit - RecordTick(statistics, block_cache_hit_ticker); - } else { - // overall cache miss - RecordTick(statistics, BLOCK_CACHE_MISS); - // block-type specific cache miss - RecordTick(statistics, block_cache_miss_ticker); - } - - return cache_handle; -} - -} // end of anonymous namespace - Status BlockBasedTable::Open(const Options& options, const EnvOptions& soptions, const BlockBasedTableOptions& table_options, const InternalKeyComparator& internal_comparator, @@ -243,6 +315,7 @@ Status BlockBasedTable::Open(const Options& options, const EnvOptions& soptions, rep->file = std::move(file); rep->metaindex_handle = footer.metaindex_handle(); rep->index_handle = footer.index_handle(); + rep->index_type = table_options.index_type; SetupCacheKeyPrefix(rep); unique_ptr new_table(new BlockBasedTable(rep)); @@ -273,12 +346,12 @@ Status BlockBasedTable::Open(const Options& options, const EnvOptions& soptions, // Will use block cache for index/filter blocks access? if (options.block_cache && table_options.cache_index_and_filter_blocks) { - // Call IndexBlockReader() to implicitly add index to the block_cache - unique_ptr iter(new_table->IndexBlockReader(ReadOptions())); + // Hack: Call NewIndexIterator() to implicitly add index to the block_cache + unique_ptr iter(new_table->NewIndexIterator(ReadOptions())); s = iter->status(); if (s.ok()) { - // Call GetFilter() to implicitly add filter to the block_cache + // Hack: Call GetFilter() to implicitly add filter to the block_cache auto filter_entry = new_table->GetFilter(); filter_entry.Release(options.block_cache.get()); } @@ -286,19 +359,12 @@ Status BlockBasedTable::Open(const Options& options, const EnvOptions& soptions, // If we don't use block cache for index/filter blocks access, we'll // pre-load these blocks, which will kept in member variables in Rep // and with a same life-time as this table object. - Block* index_block = nullptr; + IndexReader* index_reader = nullptr; // TODO: we never really verify check sum for index block - s = ReadBlockFromFile( - rep->file.get(), - ReadOptions(), - footer.index_handle(), - &index_block, - options.env - ); + s = new_table->CreateIndexReader(&index_reader); if (s.ok()) { - assert(index_block->compressionType() == kNoCompression); - rep->index_block.reset(index_block); + rep->index_reader.reset(index_reader); // Set filter block if (rep->options.filter_policy) { @@ -311,9 +377,8 @@ Status BlockBasedTable::Open(const Options& options, const EnvOptions& soptions, } } } else { - delete index_block; + delete index_reader; } - } if (s.ok()) { @@ -381,6 +446,129 @@ Status BlockBasedTable::ReadMetaBlock( return Status::OK(); } +Status BlockBasedTable::GetDataBlockFromCache( + const Slice& block_cache_key, const Slice& compressed_block_cache_key, + Cache* block_cache, Cache* block_cache_compressed, Statistics* statistics, + const ReadOptions& read_options, + BlockBasedTable::CachableEntry* block) { + Status s; + Block* compressed_block = nullptr; + Cache::Handle* block_cache_compressed_handle = nullptr; + + // Lookup uncompressed cache first + if (block_cache != nullptr) { + block->cache_handle = + GetEntryFromCache(block_cache, block_cache_key, BLOCK_CACHE_DATA_MISS, + BLOCK_CACHE_DATA_HIT, statistics); + if (block->cache_handle != nullptr) { + block->value = + reinterpret_cast(block_cache->Value(block->cache_handle)); + return s; + } + } + + // If not found, search from the compressed block cache. + assert(block->cache_handle == nullptr && block->value == nullptr); + + if (block_cache_compressed == nullptr) { + return s; + } + + assert(!compressed_block_cache_key.empty()); + block_cache_compressed_handle = + block_cache_compressed->Lookup(compressed_block_cache_key); + // if we found in the compressed cache, then uncompress and insert into + // uncompressed cache + if (block_cache_compressed_handle == nullptr) { + RecordTick(statistics, BLOCK_CACHE_COMPRESSED_MISS); + return s; + } + + // found compressed block + RecordTick(statistics, BLOCK_CACHE_COMPRESSED_HIT); + compressed_block = reinterpret_cast( + block_cache_compressed->Value(block_cache_compressed_handle)); + assert(compressed_block->compression_type() != kNoCompression); + + // Retrieve the uncompressed contents into a new buffer + BlockContents contents; + s = UncompressBlockContents(compressed_block->data(), + compressed_block->size(), &contents); + + // Insert uncompressed block into block cache + if (s.ok()) { + block->value = new Block(contents); // uncompressed block + assert(block->value->compression_type() == kNoCompression); + if (block_cache != nullptr && block->value->cachable() && + read_options.fill_cache) { + block->cache_handle = + block_cache->Insert(block_cache_key, block->value, + block->value->size(), &DeleteCachedEntry); + assert(reinterpret_cast( + block_cache->Value(block->cache_handle)) == block->value); + } + } + + // Release hold on compressed cache entry + block_cache_compressed->Release(block_cache_compressed_handle); + return s; +} + +Status BlockBasedTable::PutDataBlockToCache( + const Slice& block_cache_key, const Slice& compressed_block_cache_key, + Cache* block_cache, Cache* block_cache_compressed, + const ReadOptions& read_options, Statistics* statistics, + CachableEntry* block, Block* raw_block) { + assert(raw_block->compression_type() == kNoCompression || + block_cache_compressed != nullptr); + + Status s; + // Retrieve the uncompressed contents into a new buffer + BlockContents contents; + if (raw_block->compression_type() != kNoCompression) { + s = UncompressBlockContents(raw_block->data(), raw_block->size(), + &contents); + } + if (!s.ok()) { + delete raw_block; + return s; + } + + if (raw_block->compression_type() != kNoCompression) { + block->value = new Block(contents); // uncompressed block + } else { + block->value = raw_block; + raw_block = nullptr; + } + + // Insert compressed block into compressed block cache. + // Release the hold on the compressed cache entry immediately. + if (block_cache_compressed != nullptr && raw_block != nullptr && + raw_block->cachable()) { + auto cache_handle = block_cache_compressed->Insert( + compressed_block_cache_key, raw_block, raw_block->size(), + &DeleteCachedEntry); + block_cache_compressed->Release(cache_handle); + RecordTick(statistics, BLOCK_CACHE_COMPRESSED_MISS); + // Avoid the following code to delete this cached block. + raw_block = nullptr; + } + delete raw_block; + + // insert into uncompressed block cache + assert((block->value->compression_type() == kNoCompression)); + if (block_cache != nullptr && block->value->cachable()) { + block->cache_handle = + block_cache->Insert(block_cache_key, block->value, block->value->size(), + &DeleteCachedEntry); + RecordTick(statistics, BLOCK_CACHE_ADD); + assert(reinterpret_cast(block_cache->Value(block->cache_handle)) == + block->value); + } + + return s; +} + FilterBlockReader* BlockBasedTable::ReadFilter ( const Slice& filter_handle_value, BlockBasedTable::Rep* rep, @@ -408,270 +596,88 @@ FilterBlockReader* BlockBasedTable::ReadFilter ( rep->options, block.data, block.heap_allocated); } -Status BlockBasedTable::GetBlock( - const BlockBasedTable* table, - const BlockHandle& handle, - const ReadOptions& options, - const bool for_compaction, - const Tickers block_cache_miss_ticker, - const Tickers block_cache_hit_ticker, - bool* didIO, - CachableEntry* entry) { - bool no_io = options.read_tier == kBlockCacheTier; - Cache* block_cache = table->rep_->options.block_cache.get(); - Statistics* statistics = table->rep_->options.statistics.get(); - Status s; - - if (block_cache != nullptr) { - char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; - auto key = GetCacheKey( - table->rep_->cache_key_prefix, - table->rep_->cache_key_prefix_size, - handle, - cache_key - ); - - entry->cache_handle = GetFromBlockCache( - block_cache, - key, - block_cache_miss_ticker, - block_cache_hit_ticker, - statistics - ); - - if (entry->cache_handle != nullptr) { - entry->value = - reinterpret_cast(block_cache->Value(entry->cache_handle)); - } else if (no_io) { - // Did not find in block_cache and can't do IO - return Status::Incomplete("no blocking io"); - } else { - Histograms histogram = for_compaction ? - READ_BLOCK_COMPACTION_MICROS : READ_BLOCK_GET_MICROS; - { - // block for stop watch - StopWatch sw(table->rep_->options.env, statistics, histogram); - s = ReadBlockFromFile( - table->rep_->file.get(), - options, - handle, - &entry->value, - table->rep_->options.env, - didIO - ); - } - if (s.ok()) { - if (options.fill_cache && entry->value->isCachable()) { - entry->cache_handle = block_cache->Insert( - key, entry->value, entry->value->size(), &DeleteCachedBlock); - RecordTick(statistics, BLOCK_CACHE_ADD); - } - } - } - } else if (no_io) { - // Could not read from block_cache and can't do IO - return Status::Incomplete("no blocking io"); - } else { - s = ReadBlockFromFile( - table->rep_->file.get(), - options, - handle, - &entry->value, - table->rep_->options.env, - didIO - ); - } - - return s; -} - // Convert an index iterator value (i.e., an encoded BlockHandle) // into an iterator over the contents of the corresponding block. -Iterator* BlockBasedTable::BlockReader(void* arg, - const ReadOptions& options, - const Slice& index_value, - bool* didIO, - bool for_compaction) { +Iterator* BlockBasedTable::DataBlockReader(void* arg, + const ReadOptions& options, + const Slice& index_value, + bool* didIO, bool for_compaction) { const bool no_io = (options.read_tier == kBlockCacheTier); BlockBasedTable* table = reinterpret_cast(arg); Cache* block_cache = table->rep_->options.block_cache.get(); Cache* block_cache_compressed = table->rep_->options. block_cache_compressed.get(); - Statistics* statistics = table->rep_->options.statistics.get(); - Block* block = nullptr; - Block* cblock = nullptr; - Cache::Handle* cache_handle = nullptr; - Cache::Handle* compressed_cache_handle = nullptr; + CachableEntry block; BlockHandle handle; Slice input = index_value; - Status s = handle.DecodeFrom(&input); // We intentionally allow extra stuff in index_value so that we // can add more features in the future. + Status s = handle.DecodeFrom(&input); if (!s.ok()) { return NewErrorIterator(s); } + // If either block cache is enabled, we'll try to read from it. if (block_cache != nullptr || block_cache_compressed != nullptr) { + Statistics* statistics = table->rep_->options.statistics.get(); char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; char compressed_cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; - Slice key, /* key to the block cache */ - ckey /* key to the compressed block cache */ ; + Slice key, /* key to the block cache */ + ckey /* key to the compressed block cache */; // create key for block cache if (block_cache != nullptr) { - key = GetCacheKey( - table->rep_->cache_key_prefix, - table->rep_->cache_key_prefix_size, - handle, - cache_key - ); + key = GetCacheKey(table->rep_->cache_key_prefix, + table->rep_->cache_key_prefix_size, handle, cache_key); } if (block_cache_compressed != nullptr) { - ckey = GetCacheKey( - table->rep_->compressed_cache_key_prefix, - table->rep_->compressed_cache_key_prefix_size, - handle, - compressed_cache_key - ); + ckey = GetCacheKey(table->rep_->compressed_cache_key_prefix, + table->rep_->compressed_cache_key_prefix_size, handle, + compressed_cache_key); } - // Lookup uncompressed cache first - if (block_cache != nullptr) { - assert(!key.empty()); - cache_handle = block_cache->Lookup(key); - if (cache_handle != nullptr) { - block = reinterpret_cast(block_cache->Value(cache_handle)); - RecordTick(statistics, BLOCK_CACHE_HIT); - RecordTick(statistics, BLOCK_CACHE_DATA_HIT); - } else { - RecordTick(statistics, BLOCK_CACHE_MISS); - RecordTick(statistics, BLOCK_CACHE_DATA_MISS); - } - } + s = GetDataBlockFromCache(key, ckey, block_cache, block_cache_compressed, + statistics, options, &block); - // If not found in uncompressed cache, lookup compressed cache - if (block == nullptr && block_cache_compressed != nullptr) { - assert(!ckey.empty()); - compressed_cache_handle = block_cache_compressed->Lookup(ckey); - - // if we found in the compressed cache, then uncompress and - // insert into uncompressed cache - if (compressed_cache_handle != nullptr) { - // found compressed block - cblock = reinterpret_cast(block_cache_compressed-> - Value(compressed_cache_handle)); - assert(cblock->compressionType() != kNoCompression); - - // Retrieve the uncompressed contents into a new buffer - BlockContents contents; - s = UncompressBlockContents(cblock->data(), cblock->size(), - &contents); - - // Insert uncompressed block into block cache - if (s.ok()) { - block = new Block(contents); // uncompressed block - assert(block->compressionType() == kNoCompression); - if (block_cache != nullptr && block->isCachable() && - options.fill_cache) { - cache_handle = block_cache->Insert(key, block, block->size(), - &DeleteCachedBlock); - assert(reinterpret_cast(block_cache->Value(cache_handle)) - == block); - } - } - // Release hold on compressed cache entry - block_cache_compressed->Release(compressed_cache_handle); - RecordTick(statistics, BLOCK_CACHE_COMPRESSED_HIT); - } - } - - if (block != nullptr) { - BumpPerfCount(&perf_context.block_cache_hit_count); - } else if (no_io) { - // Did not find in block_cache and can't do IO - return NewErrorIterator(Status::Incomplete("no blocking io")); - } else { + if (block.value == nullptr && !no_io && options.fill_cache) { Histograms histogram = for_compaction ? READ_BLOCK_COMPACTION_MICROS : READ_BLOCK_GET_MICROS; - { // block for stop watch + Block* raw_block = nullptr; + { StopWatch sw(table->rep_->options.env, statistics, histogram); - s = ReadBlockFromFile( - table->rep_->file.get(), - options, - handle, - &cblock, - table->rep_->options.env, - didIO, - block_cache_compressed == nullptr - ); + s = ReadBlockFromFile(table->rep_->file.get(), options, handle, + &raw_block, table->rep_->options.env, didIO, + block_cache_compressed == nullptr); } - if (s.ok()) { - assert(cblock->compressionType() == kNoCompression || - block_cache_compressed != nullptr); - // Retrieve the uncompressed contents into a new buffer - BlockContents contents; - if (cblock->compressionType() != kNoCompression) { - s = UncompressBlockContents(cblock->data(), cblock->size(), - &contents); - } - if (s.ok()) { - if (cblock->compressionType() != kNoCompression) { - block = new Block(contents); // uncompressed block - } else { - block = cblock; - cblock = nullptr; - } - if (block->isCachable() && options.fill_cache) { - // Insert compressed block into compressed block cache. - // Release the hold on the compressed cache entry immediately. - if (block_cache_compressed != nullptr && cblock != nullptr) { - compressed_cache_handle = block_cache_compressed->Insert( - ckey, cblock, cblock->size(), &DeleteCachedBlock); - block_cache_compressed->Release(compressed_cache_handle); - RecordTick(statistics, BLOCK_CACHE_COMPRESSED_MISS); - cblock = nullptr; - } - // insert into uncompressed block cache - assert((block->compressionType() == kNoCompression)); - if (block_cache != nullptr) { - cache_handle = block_cache->Insert( - key, block, block->size(), &DeleteCachedBlock); - RecordTick(statistics, BLOCK_CACHE_ADD); - assert(reinterpret_cast(block_cache->Value( - cache_handle))== block); - } - } - } - } - if (cblock != nullptr) { - delete cblock; + if (s.ok()) { + s = PutDataBlockToCache(key, ckey, block_cache, block_cache_compressed, + options, statistics, &block, raw_block); } } - } else if (no_io) { - // Could not read from block_cache and can't do IO - return NewErrorIterator(Status::Incomplete("no blocking io")); - } else { - s = ReadBlockFromFile( - table->rep_->file.get(), - options, - handle, - &block, - table->rep_->options.env, - didIO - ); + } + + // Didn't get any data from block caches. + if (block.value == nullptr) { + if (no_io) { + // Could not read from block_cache and can't do IO + return NewErrorIterator(Status::Incomplete("no blocking io")); + } + s = ReadBlockFromFile(table->rep_->file.get(), options, handle, + &block.value, table->rep_->options.env, didIO); } Iterator* iter; - if (block != nullptr) { - iter = block->NewIterator(&(table->rep_->internal_comparator_)); - if (cache_handle != nullptr) { - iter->RegisterCleanup(&ReleaseBlock, block_cache, cache_handle); + if (block.value != nullptr) { + iter = block.value->NewIterator(&table->rep_->internal_comparator); + if (block.cache_handle != nullptr) { + iter->RegisterCleanup(&ReleaseCachedEntry, block_cache, + block.cache_handle); } else { - iter->RegisterCleanup(&DeleteBlock, block, nullptr); + iter->RegisterCleanup(&DeleteHeldResource, block.value, nullptr); } } else { iter = NewErrorIterator(s); @@ -679,8 +685,8 @@ Iterator* BlockBasedTable::BlockReader(void* arg, return iter; } -BlockBasedTable::CachableEntry -BlockBasedTable::GetFilter(bool no_io) const { +BlockBasedTable::CachableEntry BlockBasedTable::GetFilter( + bool no_io) const { // filter pre-populated if (rep_->filter != nullptr) { return {rep_->filter.get(), nullptr /* cache handle */}; @@ -702,13 +708,9 @@ BlockBasedTable::GetFilter(bool no_io) const { ); Statistics* statistics = rep_->options.statistics.get(); - auto cache_handle = GetFromBlockCache( - block_cache, - key, - BLOCK_CACHE_FILTER_MISS, - BLOCK_CACHE_FILTER_HIT, - statistics - ); + auto cache_handle = + GetEntryFromCache(block_cache, key, BLOCK_CACHE_FILTER_MISS, + BLOCK_CACHE_FILTER_HIT, statistics); FilterBlockReader* filter = nullptr; if (cache_handle != nullptr) { @@ -734,7 +736,7 @@ BlockBasedTable::GetFilter(bool no_io) const { assert(filter_size > 0); cache_handle = block_cache->Insert( - key, filter, filter_size, &DeleteCachedFilter); + key, filter, filter_size, &DeleteCachedEntry); RecordTick(statistics, BLOCK_CACHE_ADD); } } @@ -743,50 +745,59 @@ BlockBasedTable::GetFilter(bool no_io) const { return { filter, cache_handle }; } -// Get the iterator from the index block. -Iterator* BlockBasedTable::IndexBlockReader(const ReadOptions& options) const { - if (rep_->index_block) { - return rep_->index_block->NewIterator(&(rep_->internal_comparator_)); +Iterator* BlockBasedTable::NewIndexIterator(const ReadOptions& read_options) + const { + // index reader has already been pre-populated. + if (rep_->index_reader) { + return rep_->index_reader->NewIterator(); } - // get index block from cache - assert (rep_->options.block_cache); - bool didIO = false; - CachableEntry entry; + bool no_io = read_options.read_tier == kBlockCacheTier; + Cache* block_cache = rep_->options.block_cache.get(); + char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; + auto key = GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size, + rep_->index_handle, cache_key); + Statistics* statistics = rep_->options.statistics.get(); + auto cache_handle = + GetEntryFromCache(block_cache, key, BLOCK_CACHE_INDEX_MISS, + BLOCK_CACHE_INDEX_HIT, statistics); - auto s = GetBlock( - this, - rep_->index_handle, - options, - false, /* for compaction */ - BLOCK_CACHE_INDEX_MISS, - BLOCK_CACHE_INDEX_HIT, - &didIO, - &entry - ); + if (cache_handle == nullptr && no_io) { + return NewErrorIterator(Status::Incomplete("no blocking io")); + } - Iterator* iter; - if (entry.value != nullptr) { - iter = entry.value->NewIterator(&(rep_->internal_comparator_)); - if (entry.cache_handle) { - iter->RegisterCleanup( - &ReleaseBlock, rep_->options.block_cache.get(), entry.cache_handle - ); - } else { - iter->RegisterCleanup(&DeleteBlock, entry.value, nullptr); - } + IndexReader* index_reader = nullptr; + if (cache_handle != nullptr) { + index_reader = + reinterpret_cast(block_cache->Value(cache_handle)); } else { - iter = NewErrorIterator(s); + // Create index reader and put it in the cache. + Status s; + s = CreateIndexReader(&index_reader); + + if (!s.ok()) { + // make sure if something goes wrong, index_reader shall remain intact. + assert(index_reader == nullptr); + return NewErrorIterator(s); + } + + cache_handle = block_cache->Insert(key, index_reader, index_reader->size(), + &DeleteCachedEntry); + RecordTick(statistics, BLOCK_CACHE_ADD); } + + assert(cache_handle); + auto iter = index_reader->NewIterator(); + iter->RegisterCleanup(&ReleaseCachedEntry, block_cache, cache_handle); + return iter; } -Iterator* BlockBasedTable::BlockReader(void* arg, const ReadOptions& options, - const EnvOptions& soptions, - const InternalKeyComparator& icomparator, - const Slice& index_value, - bool for_compaction) { - return BlockReader(arg, options, index_value, nullptr, for_compaction); +Iterator* BlockBasedTable::DataBlockReader( + void* arg, const ReadOptions& options, const EnvOptions& soptions, + const InternalKeyComparator& icomparator, const Slice& index_value, + bool for_compaction) { + return DataBlockReader(arg, options, index_value, nullptr, for_compaction); } // This will be broken if the user specifies an unusual implementation @@ -814,9 +825,7 @@ bool BlockBasedTable::PrefixMayMatch(const Slice& internal_prefix) { // loaded to memory. ReadOptions no_io_read_options; no_io_read_options.read_tier = kBlockCacheTier; - unique_ptr iiter( - IndexBlockReader(no_io_read_options) - ); + unique_ptr iiter(NewIndexIterator(no_io_read_options)); iiter->Seek(internal_prefix); if (!iiter->Valid()) { @@ -874,20 +883,20 @@ Iterator* BlockBasedTable::NewIterator(const ReadOptions& options) { } } - return NewTwoLevelIterator(IndexBlockReader(options), - &BlockBasedTable::BlockReader, + return NewTwoLevelIterator(NewIndexIterator(options), + &BlockBasedTable::DataBlockReader, const_cast(this), options, - rep_->soptions, rep_->internal_comparator_); + rep_->soptions, rep_->internal_comparator); } Status BlockBasedTable::Get( - const ReadOptions& readOptions, const Slice& key, void* handle_context, + const ReadOptions& read_options, const Slice& key, void* handle_context, bool (*result_handler)(void* handle_context, const ParsedInternalKey& k, const Slice& v, bool didIO), void (*mark_key_may_exist_handler)(void* handle_context)) { Status s; - Iterator* iiter = IndexBlockReader(readOptions); - auto filter_entry = GetFilter(readOptions.read_tier == kBlockCacheTier); + Iterator* iiter = NewIndexIterator(read_options); + auto filter_entry = GetFilter(read_options.read_tier == kBlockCacheTier); FilterBlockReader* filter = filter_entry.value; bool done = false; for (iiter->Seek(key); iiter->Valid() && !done; iiter->Next()) { @@ -908,9 +917,9 @@ Status BlockBasedTable::Get( } else { bool didIO = false; unique_ptr block_iter( - BlockReader(this, readOptions, iiter->value(), &didIO)); + DataBlockReader(this, read_options, iiter->value(), &didIO)); - if (readOptions.read_tier && block_iter->status().IsIncomplete()) { + if (read_options.read_tier && block_iter->status().IsIncomplete()) { // couldn't get block from block_cache // Update Saver.state to Found because we are only looking for whether // we can guarantee the key is not there when "no_io" is set @@ -958,8 +967,41 @@ bool BlockBasedTable::TEST_KeyInCache(const ReadOptions& options, return !didIO; } +// REQUIRES: The following fields of rep_ should have already been populated: +// 1. file +// 2. index_handle, +// 3. options +// 4. internal_comparator +// 5. index_type +Status BlockBasedTable::CreateIndexReader(IndexReader** index_reader) const { + // Some old version of block-based tables don't have index type present in + // table properties. If that's the case we can safely use the kBinarySearch. + auto index_type = BlockBasedTableOptions::kBinarySearch; + auto& props = rep_->table_properties->user_collected_properties; + auto pos = props.find(BlockBasedTablePropertyNames::kIndexType); + if (pos != props.end()) { + index_type = static_cast( + DecodeFixed32(pos->second.c_str())); + } + + switch (index_type) { + case BlockBasedTableOptions::kBinarySearch: { + return BinarySearchIndexReader::Create( + rep_->file.get(), rep_->index_handle, rep_->options.env, + &rep_->internal_comparator, index_reader); + } + default: { + std::string error_message = + "Unrecognized index type: " + std::to_string(rep_->index_type); + // equivalent to assert(false), but more informative. + assert(!error_message.c_str()); + return Status::InvalidArgument(error_message.c_str()); + } + } +} + uint64_t BlockBasedTable::ApproximateOffsetOf(const Slice& key) { - Iterator* index_iter = IndexBlockReader(ReadOptions()); + unique_ptr index_iter(NewIndexIterator(ReadOptions())); index_iter->Seek(key); uint64_t result; @@ -976,12 +1018,15 @@ uint64_t BlockBasedTable::ApproximateOffsetOf(const Slice& key) { result = rep_->metaindex_handle.offset(); } } else { - // key is past the last key in the file. Approximate the offset - // by returning the offset of the metaindex block (which is - // right near the end of the file). - result = rep_->metaindex_handle.offset(); + // key is past the last key in the file. If table_properties is not + // available, approximate the offset by returning the offset of the + // metaindex block (which is right near the end of the file). + result = rep_->table_properties->data_size; + // table_properties is not present in the table. + if (result == 0) { + result = rep_->metaindex_handle.offset(); + } } - delete index_iter; return result; } @@ -989,8 +1034,8 @@ bool BlockBasedTable::TEST_filter_block_preloaded() const { return rep_->filter != nullptr; } -bool BlockBasedTable::TEST_index_block_preloaded() const { - return rep_->index_block != nullptr; +bool BlockBasedTable::TEST_index_reader_preloaded() const { + return rep_->index_reader != nullptr; } } // namespace rocksdb diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index fc584d9ece..8b8f09bd39 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -8,12 +8,14 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once -#include + #include -#include "rocksdb/cache.h" -#include "rocksdb/env.h" -#include "rocksdb/iterator.h" +#include +#include + #include "rocksdb/statistics.h" +#include "rocksdb/status.h" +#include "rocksdb/table.h" #include "table/table_reader.h" #include "util/coding.h" @@ -21,14 +23,19 @@ namespace rocksdb { class Block; class BlockHandle; +class Cache; +class FilterBlockReader; class Footer; -struct Options; +class InternalKeyComparator; +class Iterator; class RandomAccessFile; -struct ReadOptions; class TableCache; class TableReader; -class FilterBlockReader; +class WritableFile; struct BlockBasedTableOptions; +struct EnvOptions; +struct Options; +struct ReadOptions; using std::unique_ptr; @@ -91,7 +98,9 @@ class BlockBasedTable : public TableReader { ~BlockBasedTable(); bool TEST_filter_block_preloaded() const; - bool TEST_index_block_preloaded() const; + bool TEST_index_reader_preloaded() const; + // Implementation of IndexReader will be exposed to internal cc file only. + class IndexReader; private: template @@ -101,40 +110,51 @@ class BlockBasedTable : public TableReader { Rep* rep_; bool compaction_optimized_; - static Iterator* BlockReader(void*, const ReadOptions&, - const EnvOptions& soptions, - const InternalKeyComparator& icomparator, - const Slice&, bool for_compaction); + static Iterator* DataBlockReader(void*, const ReadOptions&, + const EnvOptions& soptions, + const InternalKeyComparator& icomparator, + const Slice&, bool for_compaction); - static Iterator* BlockReader(void*, const ReadOptions&, const Slice&, - bool* didIO, bool for_compaction = false); + static Iterator* DataBlockReader(void*, const ReadOptions&, const Slice&, + bool* didIO, bool for_compaction = false); - // if `no_io == true`, we will not try to read filter from sst file - // if it is not cached yet. + // For the following two functions: + // if `no_io == true`, we will not try to read filter/index from sst file + // were they not present in cache yet. CachableEntry GetFilter(bool no_io = false) const; - Iterator* IndexBlockReader(const ReadOptions& options) const; + // Get the iterator from the index reader. + // + // Note: ErrorIterator with Status::Incomplete shall be returned if all the + // following conditions are met: + // 1. We enabled table_options.cache_index_and_filter_blocks. + // 2. index is not present in block cache. + // 3. We disallowed any io to be performed, that is, read_options == + // kBlockCacheTier + Iterator* NewIndexIterator(const ReadOptions& read_options) const; - // Read the block, either from sst file or from cache. This method will try - // to read from cache only when block_cache is set or ReadOption doesn't - // explicitly prohibit storage IO. + // Read block cache from block caches (if set): block_cache and + // block_cache_compressed. + // On success, Status::OK with be returned and @block will be populated with + // pointer to the block as well as its block handle. + static Status GetDataBlockFromCache( + const Slice& block_cache_key, const Slice& compressed_block_cache_key, + Cache* block_cache, Cache* block_cache_compressed, Statistics* statistics, + const ReadOptions& read_options, + BlockBasedTable::CachableEntry* block); + // Put a raw block (maybe compressed) to the corresponding block caches. + // This method will perform decompression against raw_block if needed and then + // populate the block caches. + // On success, Status::OK will be returned; also @block will be populated with + // uncompressed block and its cache handle. // - // If the block is read from cache, the statistics for cache miss/hit of the - // the given type of block will be updated. User can specify - // `block_cache_miss_ticker` and `block_cache_hit_ticker` for the statistics - // update. - // - // On success, the `result` parameter will be populated, which contains a - // pointer to the block and its cache handle, which will be nullptr if it's - // not read from the cache. - static Status GetBlock(const BlockBasedTable* table, - const BlockHandle& handle, - const ReadOptions& options, - bool for_compaction, - Tickers block_cache_miss_ticker, - Tickers block_cache_hit_ticker, - bool* didIO, - CachableEntry* result); + // REQUIRES: raw_block is heap-allocated. PutDataBlockToCache() will be + // responsible for releasing its memory if error occurs. + static Status PutDataBlockToCache( + const Slice& block_cache_key, const Slice& compressed_block_cache_key, + Cache* block_cache, Cache* block_cache_compressed, + const ReadOptions& read_options, Statistics* statistics, + CachableEntry* block, Block* raw_block); // Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found // after a call to Seek(key), until handle_result returns false. @@ -144,6 +164,7 @@ class BlockBasedTable : public TableReader { void ReadMeta(const Footer& footer); void ReadFilter(const Slice& filter_handle_value); + Status CreateIndexReader(IndexReader** index_reader) const; // Read the meta block from sst. static Status ReadMetaBlock( @@ -159,10 +180,9 @@ class BlockBasedTable : public TableReader { static void SetupCacheKeyPrefix(Rep* rep); - explicit BlockBasedTable(Rep* rep) : - compaction_optimized_(false) { - rep_ = rep; - } + explicit BlockBasedTable(Rep* rep) + : rep_(rep), compaction_optimized_(false) {} + // Generate a cache key prefix from the file static void GenerateCachePrefix(Cache* cc, RandomAccessFile* file, char* buffer, size_t* size); diff --git a/table/flush_block_policy.cc b/table/flush_block_policy.cc index a953a78a79..4e2235205f 100644 --- a/table/flush_block_policy.cc +++ b/table/flush_block_policy.cc @@ -3,6 +3,7 @@ // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. +#include "rocksdb/options.h" #include "rocksdb/flush_block_policy.h" #include "rocksdb/slice.h" #include "table/block_builder.h" @@ -61,10 +62,9 @@ class FlushBlockBySizePolicy : public FlushBlockPolicy { }; FlushBlockPolicy* FlushBlockBySizePolicyFactory::NewFlushBlockPolicy( - const BlockBuilder& data_block_builder) const { - return new FlushBlockBySizePolicy(block_size_, - block_size_deviation_, - data_block_builder); + const Options& options, const BlockBuilder& data_block_builder) const { + return new FlushBlockBySizePolicy( + options.block_size, options.block_size_deviation, data_block_builder); } } // namespace rocksdb diff --git a/table/plain_table_reader.cc b/table/plain_table_reader.cc index 593530c87f..8b675af113 100644 --- a/table/plain_table_reader.cc +++ b/table/plain_table_reader.cc @@ -527,13 +527,14 @@ Status PlainTableReader::ReadKey(const char* start, ParsedInternalKey* key, key_ptr = GetVarint32Ptr(start, file_data_.data() + data_end_offset_, &tmp_size); if (key_ptr == nullptr) { - return Status::Corruption("Unable to read the next key"); + return Status::Corruption( + "Unexpected EOF when reading the next key's size"); } user_key_size = (size_t)tmp_size; *bytes_read = key_ptr - start; } if (key_ptr + user_key_size + 1 >= file_data_.data() + data_end_offset_) { - return Status::Corruption("Unable to read the next key"); + return Status::Corruption("Unexpected EOF when reading the next key"); } if (*(key_ptr + user_key_size) == PlainTableFactory::kValueTypeSeqId0) { @@ -544,10 +545,12 @@ Status PlainTableReader::ReadKey(const char* start, ParsedInternalKey* key, *bytes_read += user_key_size + 1; } else { if (start + user_key_size + 8 >= file_data_.data() + data_end_offset_) { - return Status::Corruption("Unable to read the next key"); + return Status::Corruption( + "Unexpected EOF when reading internal bytes of the next key"); } if (!ParseInternalKey(Slice(key_ptr, user_key_size + 8), key)) { - return Status::Corruption(Slice()); + return Status::Corruption( + Slice("Incorrect value type found when reading the next key")); } *bytes_read += user_key_size + 8; } @@ -569,15 +572,19 @@ Status PlainTableReader::Next(uint32_t* offset, ParsedInternalKey* key, const char* start = file_data_.data() + *offset; size_t bytes_for_key; Status s = ReadKey(start, key, &bytes_for_key); + if (!s.ok()) { + return s; + } uint32_t value_size; const char* value_ptr = GetVarint32Ptr( start + bytes_for_key, file_data_.data() + data_end_offset_, &value_size); if (value_ptr == nullptr) { - return Status::Corruption("Error reading value length."); + return Status::Corruption( + "Unexpected EOF when reading the next value's size."); } *offset = *offset + (value_ptr - start) + value_size; if (*offset > data_end_offset_) { - return Status::Corruption("Reach end of file when reading value"); + return Status::Corruption("Unexpected EOF when reading the next value. "); } *value = Slice(value_ptr, value_size); diff --git a/table/table_test.cc b/table/table_test.cc index 2f6881dd00..b2173ea896 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -9,6 +9,7 @@ #include #include + #include #include #include @@ -16,8 +17,6 @@ #include #include "db/dbformat.h" -#include "rocksdb/statistics.h" -#include "util/statistics.h" #include "db/memtable.h" #include "db/write_batch_internal.h" @@ -25,11 +24,11 @@ #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" -#include "rocksdb/slice_transform.h" #include "rocksdb/memtablerep.h" +#include "rocksdb/slice_transform.h" +#include "rocksdb/statistics.h" + #include "table/block.h" -#include "table/meta_blocks.h" -#include "table/block_based_table_reader.h" #include "table/block_based_table_builder.h" #include "table/block_based_table_factory.h" #include "table/block_based_table_reader.h" @@ -39,6 +38,7 @@ #include "table/plain_table_factory.h" #include "util/random.h" +#include "util/statistics.h" #include "util/testharness.h" #include "util/testutil.h" @@ -690,8 +690,7 @@ class Harness { switch (args.type) { case BLOCK_BASED_TABLE_TEST: table_options.flush_block_policy_factory.reset( - new FlushBlockBySizePolicyFactory(options_.block_size, - options_.block_size_deviation)); + new FlushBlockBySizePolicyFactory()); options_.table_factory.reset(new BlockBasedTableFactory(table_options)); constructor_ = new TableConstructor(options_.comparator); break; @@ -1203,7 +1202,7 @@ TEST(BlockBasedTableTest, BlockCacheDisabledTest) { // preloading filter/index blocks is enabled. auto reader = dynamic_cast(c.table_reader()); ASSERT_TRUE(reader->TEST_filter_block_preloaded()); - ASSERT_TRUE(reader->TEST_index_block_preloaded()); + ASSERT_TRUE(reader->TEST_index_reader_preloaded()); { // nothing happens in the beginning @@ -1244,7 +1243,7 @@ TEST(BlockBasedTableTest, FilterBlockInBlockCache) { // preloading filter/index blocks is prohibited. auto reader = dynamic_cast(c.table_reader()); ASSERT_TRUE(!reader->TEST_filter_block_preloaded()); - ASSERT_TRUE(!reader->TEST_index_block_preloaded()); + ASSERT_TRUE(!reader->TEST_index_reader_preloaded()); // -- PART 1: Open with regular block cache. // Since block_cache is disabled, no cache activities will be involved. diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 6270d69ca6..b3d08f8880 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -70,7 +70,7 @@ def main(argv): --threads=%s --write_buffer_size=%s --destroy_db_initially=0 - --reopen=0 + --reopen=20 --readpercent=45 --prefixpercent=5 --writepercent=35 diff --git a/tools/db_crashtest2.py b/tools/db_crashtest2.py index dbb7059fe0..8b7ce969df 100644 --- a/tools/db_crashtest2.py +++ b/tools/db_crashtest2.py @@ -84,7 +84,7 @@ def main(argv): --threads=%s --write_buffer_size=%s --destroy_db_initially=0 - --reopen=0 + --reopen=20 --readpercent=45 --prefixpercent=5 --writepercent=35 diff --git a/util/auto_roll_logger.h b/util/auto_roll_logger.h index 68705a2db9..b0cf1bfcea 100644 --- a/util/auto_roll_logger.h +++ b/util/auto_roll_logger.h @@ -17,20 +17,21 @@ namespace rocksdb { class AutoRollLogger : public Logger { public: AutoRollLogger(Env* env, const std::string& dbname, - const std::string& db_log_dir, - size_t log_max_size, - size_t log_file_time_to_roll): - dbname_(dbname), - db_log_dir_(db_log_dir), - env_(env), - status_(Status::OK()), - kMaxLogFileSize(log_max_size), - kLogFileTimeToRoll(log_file_time_to_roll), - cached_now(static_cast(env_->NowMicros() * 1e-6)), - ctime_(cached_now), - cached_now_access_count(0), - call_NowMicros_every_N_records_(100), - mutex_() { + const std::string& db_log_dir, size_t log_max_size, + size_t log_file_time_to_roll, + const InfoLogLevel log_level = InfoLogLevel::ERROR) + : Logger(log_level), + dbname_(dbname), + db_log_dir_(db_log_dir), + env_(env), + status_(Status::OK()), + kMaxLogFileSize(log_max_size), + kLogFileTimeToRoll(log_file_time_to_roll), + cached_now(static_cast(env_->NowMicros() * 1e-6)), + ctime_(cached_now), + cached_now_access_count(0), + call_NowMicros_every_N_records_(100), + mutex_() { env->GetAbsolutePath(dbname, &db_absolute_path_); log_fname_ = InfoLogFileName(dbname_, db_absolute_path_, db_log_dir_); RollLogFile(); diff --git a/util/auto_roll_logger_test.cc b/util/auto_roll_logger_test.cc index d8dbd91825..742713e9d3 100755 --- a/util/auto_roll_logger_test.cc +++ b/util/auto_roll_logger_test.cc @@ -5,12 +5,15 @@ // #include #include +#include +#include +#include +#include #include "util/testharness.h" #include "util/auto_roll_logger.h" #include "rocksdb/db.h" #include #include -#include using namespace std; @@ -39,10 +42,8 @@ class AutoRollLoggerTest { const string AutoRollLoggerTest::kSampleMessage( "this is the message to be written to the log file!!"); -const string AutoRollLoggerTest::kTestDir( - test::TmpDir() + "/db_log_test"); -const string AutoRollLoggerTest::kLogFile( - test::TmpDir() + "/db_log_test/LOG"); +const string AutoRollLoggerTest::kTestDir(test::TmpDir() + "/db_log_test"); +const string AutoRollLoggerTest::kLogFile(test::TmpDir() + "/db_log_test/LOG"); Env* AutoRollLoggerTest::env = Env::Default(); // In this test we only want to Log some simple log message with @@ -53,6 +54,11 @@ void LogMessage(Logger* logger, const char* message) { Log(logger, "%s", message); } +void LogMessage(const InfoLogLevel log_level, Logger* logger, + const char* message) { + Log(log_level, logger, "%s", message); +} + void GetFileCreateTime(const std::string& fname, uint64_t* file_ctime) { struct stat s; if (stat(fname.c_str(), &s) != 0) { @@ -64,6 +70,7 @@ void GetFileCreateTime(const std::string& fname, uint64_t* file_ctime) { void AutoRollLoggerTest::RollLogFileBySizeTest(AutoRollLogger* logger, size_t log_max_size, const string& log_message) { + logger->SetInfoLogLevel(InfoLogLevel::INFO); // measure the size of each message, which is supposed // to be equal or greater than log_message.size() LogMessage(logger, log_message.c_str()); @@ -131,7 +138,6 @@ TEST(AutoRollLoggerTest, RollLogFileBySize) { RollLogFileBySizeTest(&logger, log_max_size, kSampleMessage + ":RollLogFileBySize"); - } TEST(AutoRollLoggerTest, RollLogFileByTime) { @@ -235,6 +241,46 @@ TEST(AutoRollLoggerTest, CreateLoggerFromOptions) { kSampleMessage + ":CreateLoggerFromOptions - both"); } +TEST(AutoRollLoggerTest, InfoLogLevel) { + InitTestDb(); + + size_t log_size = 8192; + size_t log_lines = 0; + // an extra-scope to force the AutoRollLogger to flush the log file when it + // becomes out of scope. + { + AutoRollLogger logger(Env::Default(), kTestDir, "", log_size, 0); + for (int log_level = InfoLogLevel::FATAL; log_level >= InfoLogLevel::DEBUG; + log_level--) { + logger.SetInfoLogLevel((InfoLogLevel)log_level); + for (int log_type = InfoLogLevel::DEBUG; log_type <= InfoLogLevel::FATAL; + log_type++) { + // log messages with log level smaller than log_level will not be + // logged. + LogMessage((InfoLogLevel)log_type, &logger, kSampleMessage.c_str()); + } + log_lines += InfoLogLevel::FATAL - log_level + 1; + } + for (int log_level = InfoLogLevel::FATAL; log_level >= InfoLogLevel::DEBUG; + log_level--) { + logger.SetInfoLogLevel((InfoLogLevel)log_level); + + // again, messages with level smaller than log_level will not be logged. + Debug(&logger, "%s", kSampleMessage.c_str()); + Info(&logger, "%s", kSampleMessage.c_str()); + Warn(&logger, "%s", kSampleMessage.c_str()); + Error(&logger, "%s", kSampleMessage.c_str()); + Fatal(&logger, "%s", kSampleMessage.c_str()); + log_lines += InfoLogLevel::FATAL - log_level + 1; + } + } + std::ifstream inFile(AutoRollLoggerTest::kLogFile.c_str()); + size_t lines = std::count(std::istreambuf_iterator(inFile), + std::istreambuf_iterator(), '\n'); + ASSERT_EQ(log_lines, lines); + inFile.close(); +} + int OldLogFileCount(const string& dir) { std::vector files; Env::Default()->GetChildren(dir, &files); diff --git a/util/env.cc b/util/env.cc index 6964a72ca3..1d89e15ae0 100644 --- a/util/env.cc +++ b/util/env.cc @@ -45,12 +45,120 @@ void Log(Logger* info_log, const char* format, ...) { } } +void Log(const InfoLogLevel log_level, Logger* info_log, const char* format, + ...) { + if (info_log) { + va_list ap; + va_start(ap, format); + info_log->Logv(log_level, format, ap); + va_end(ap); + } +} + +void Debug(Logger* info_log, const char* format, ...) { + if (info_log) { + va_list ap; + va_start(ap, format); + info_log->Logv(InfoLogLevel::DEBUG, format, ap); + va_end(ap); + } +} + +void Info(Logger* info_log, const char* format, ...) { + if (info_log) { + va_list ap; + va_start(ap, format); + info_log->Logv(InfoLogLevel::INFO, format, ap); + va_end(ap); + } +} + +void Warn(Logger* info_log, const char* format, ...) { + if (info_log) { + va_list ap; + va_start(ap, format); + info_log->Logv(InfoLogLevel::WARN, format, ap); + va_end(ap); + } +} +void Error(Logger* info_log, const char* format, ...) { + if (info_log) { + va_list ap; + va_start(ap, format); + info_log->Logv(InfoLogLevel::ERROR, format, ap); + va_end(ap); + } +} +void Fatal(Logger* info_log, const char* format, ...) { + if (info_log) { + va_list ap; + va_start(ap, format); + info_log->Logv(InfoLogLevel::FATAL, format, ap); + va_end(ap); + } +} + void LogFlush(const shared_ptr& info_log) { if (info_log) { info_log->Flush(); } } +void Log(const InfoLogLevel log_level, const shared_ptr& info_log, + const char* format, ...) { + if (info_log) { + va_list ap; + va_start(ap, format); + info_log->Logv(log_level, format, ap); + va_end(ap); + } +} + +void Debug(const shared_ptr& info_log, const char* format, ...) { + if (info_log) { + va_list ap; + va_start(ap, format); + info_log->Logv(InfoLogLevel::DEBUG, format, ap); + va_end(ap); + } +} + +void Info(const shared_ptr& info_log, const char* format, ...) { + if (info_log) { + va_list ap; + va_start(ap, format); + info_log->Logv(InfoLogLevel::INFO, format, ap); + va_end(ap); + } +} + +void Warn(const shared_ptr& info_log, const char* format, ...) { + if (info_log) { + va_list ap; + va_start(ap, format); + info_log->Logv(InfoLogLevel::WARN, format, ap); + va_end(ap); + } +} + +void Error(const shared_ptr& info_log, const char* format, ...) { + if (info_log) { + va_list ap; + va_start(ap, format); + info_log->Logv(InfoLogLevel::ERROR, format, ap); + va_end(ap); + } +} + +void Fatal(const shared_ptr& info_log, const char* format, ...) { + if (info_log) { + va_list ap; + va_start(ap, format); + info_log->Logv(InfoLogLevel::FATAL, format, ap); + va_end(ap); + } +} + void Log(const shared_ptr& info_log, const char* format, ...) { if (info_log) { va_list ap; @@ -129,6 +237,12 @@ void AssignEnvOptions(EnvOptions* env_options, const DBOptions& options) { } +EnvOptions EnvOptions::AdaptForLogWrite() const { + EnvOptions adapted = *this; + adapted.use_mmap_writes = false; + return adapted; +} + EnvOptions::EnvOptions(const DBOptions& options) { AssignEnvOptions(this, options); } diff --git a/util/env_hdfs.cc b/util/env_hdfs.cc index 67f0ef797b..c724b2302c 100644 --- a/util/env_hdfs.cc +++ b/util/env_hdfs.cc @@ -236,8 +236,9 @@ class HdfsLogger : public Logger { uint64_t (*gettid_)(); // Return the thread id for the current thread public: - HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)()) - : file_(f), gettid_(gettid) { + HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)(), + const InfoLogLevel log_level = InfoLogLevel::ERROR) + : Logger(log_level), file_(f), gettid_(gettid) { Log(mylog, "[hdfs] HdfsLogger opened %s\n", file_->getName().c_str()); } diff --git a/util/options.cc b/util/options.cc index 907c2a8ea5..63bedb858e 100644 --- a/util/options.cc +++ b/util/options.cc @@ -176,7 +176,8 @@ DBOptions::DBOptions() advise_random_on_open(true), access_hint_on_compaction_start(NORMAL), use_adaptive_mutex(false), - bytes_per_sync(0) { } + bytes_per_sync(0), + allow_thread_local(true) {} DBOptions::DBOptions(const Options& options) : create_if_missing(options.create_if_missing), @@ -214,7 +215,8 @@ DBOptions::DBOptions(const Options& options) advise_random_on_open(options.advise_random_on_open), access_hint_on_compaction_start(options.access_hint_on_compaction_start), use_adaptive_mutex(options.use_adaptive_mutex), - bytes_per_sync(options.bytes_per_sync) {} + bytes_per_sync(options.bytes_per_sync), + allow_thread_local(options.allow_thread_local) {} static const char* const access_hints[] = { "NONE", "NORMAL", "SEQUENTIAL", "WILLNEED" diff --git a/util/posix_logger.h b/util/posix_logger.h index 8f7463c98b..a1086973e9 100644 --- a/util/posix_logger.h +++ b/util/posix_logger.h @@ -38,9 +38,16 @@ class PosixLogger : public Logger { Env* env_; bool flush_pending_; public: - PosixLogger(FILE* f, uint64_t (*gettid)(), Env* env) : - file_(f), gettid_(gettid), log_size_(0), fd_(fileno(f)), - last_flush_micros_(0), env_(env), flush_pending_(false) { } + PosixLogger(FILE* f, uint64_t (*gettid)(), Env* env, + const InfoLogLevel log_level = InfoLogLevel::ERROR) + : Logger(log_level), + file_(f), + gettid_(gettid), + log_size_(0), + fd_(fileno(f)), + last_flush_micros_(0), + env_(env), + flush_pending_(false) {} virtual ~PosixLogger() { fclose(file_); } diff --git a/util/statistics.h b/util/statistics.h index d8cb36e0a6..d57a1dd4b3 100644 --- a/util/statistics.h +++ b/util/statistics.h @@ -7,11 +7,11 @@ #include "rocksdb/statistics.h" #include "util/histogram.h" #include "util/mutexlock.h" +#include "port/likely.h" #include #include -#define UNLIKELY(val) (__builtin_expect((val), 0)) namespace rocksdb { diff --git a/util/thread_local.cc b/util/thread_local.cc index 90571b97e7..2e5d3618bf 100644 --- a/util/thread_local.cc +++ b/util/thread_local.cc @@ -9,12 +9,8 @@ #include "util/thread_local.h" #include "util/mutexlock.h" +#include "port/likely.h" -#if defined(__GNUC__) && __GNUC__ >= 4 -#define UNLIKELY(x) (__builtin_expect((x), 0)) -#else -#define UNLIKELY(x) (x) -#endif namespace rocksdb { diff --git a/util/thread_local.h b/util/thread_local.h index d6fc5f085d..1dfc3cf137 100644 --- a/util/thread_local.h +++ b/util/thread_local.h @@ -16,6 +16,7 @@ #include "util/autovector.h" #include "port/port_posix.h" +#include "util/thread_local.h" namespace rocksdb { diff --git a/util/thread_local_test.cc b/util/thread_local_test.cc index bc7aa5b525..96e35d9592 100644 --- a/util/thread_local_test.cc +++ b/util/thread_local_test.cc @@ -58,52 +58,52 @@ TEST(ThreadLocalTest, UniqueIdTest) { port::Mutex mu; port::CondVar cv(&mu); - ASSERT_EQ(IDChecker::PeekId(), 0); + ASSERT_EQ(IDChecker::PeekId(), 0u); // New ThreadLocal instance bumps id by 1 { // Id used 0 - Params p1(&mu, &cv, nullptr, 1); - ASSERT_EQ(IDChecker::PeekId(), 1); + Params p1(&mu, &cv, nullptr, 1u); + ASSERT_EQ(IDChecker::PeekId(), 1u); // Id used 1 - Params p2(&mu, &cv, nullptr, 1); - ASSERT_EQ(IDChecker::PeekId(), 2); + Params p2(&mu, &cv, nullptr, 1u); + ASSERT_EQ(IDChecker::PeekId(), 2u); // Id used 2 - Params p3(&mu, &cv, nullptr, 1); - ASSERT_EQ(IDChecker::PeekId(), 3); + Params p3(&mu, &cv, nullptr, 1u); + ASSERT_EQ(IDChecker::PeekId(), 3u); // Id used 3 - Params p4(&mu, &cv, nullptr, 1); - ASSERT_EQ(IDChecker::PeekId(), 4); + Params p4(&mu, &cv, nullptr, 1u); + ASSERT_EQ(IDChecker::PeekId(), 4u); } // id 3, 2, 1, 0 are in the free queue in order - ASSERT_EQ(IDChecker::PeekId(), 0); + ASSERT_EQ(IDChecker::PeekId(), 0u); // pick up 0 - Params p1(&mu, &cv, nullptr, 1); - ASSERT_EQ(IDChecker::PeekId(), 1); + Params p1(&mu, &cv, nullptr, 1u); + ASSERT_EQ(IDChecker::PeekId(), 1u); // pick up 1 - Params* p2 = new Params(&mu, &cv, nullptr, 1); - ASSERT_EQ(IDChecker::PeekId(), 2); + Params* p2 = new Params(&mu, &cv, nullptr, 1u); + ASSERT_EQ(IDChecker::PeekId(), 2u); // pick up 2 - Params p3(&mu, &cv, nullptr, 1); - ASSERT_EQ(IDChecker::PeekId(), 3); + Params p3(&mu, &cv, nullptr, 1u); + ASSERT_EQ(IDChecker::PeekId(), 3u); // return up 1 delete p2; - ASSERT_EQ(IDChecker::PeekId(), 1); + ASSERT_EQ(IDChecker::PeekId(), 1u); // Now we have 3, 1 in queue // pick up 1 - Params p4(&mu, &cv, nullptr, 1); - ASSERT_EQ(IDChecker::PeekId(), 3); + Params p4(&mu, &cv, nullptr, 1u); + ASSERT_EQ(IDChecker::PeekId(), 3u); // pick up 3 - Params p5(&mu, &cv, nullptr, 1); + Params p5(&mu, &cv, nullptr, 1u); // next new id - ASSERT_EQ(IDChecker::PeekId(), 4); + ASSERT_EQ(IDChecker::PeekId(), 4u); // After exit, id sequence in queue: // 3, 1, 2, 0 } TEST(ThreadLocalTest, SequentialReadWriteTest) { // global id list carries over 3, 1, 2, 0 - ASSERT_EQ(IDChecker::PeekId(), 0); + ASSERT_EQ(IDChecker::PeekId(), 0u); port::Mutex mu; port::CondVar cv(&mu); @@ -133,7 +133,7 @@ TEST(ThreadLocalTest, SequentialReadWriteTest) { }; for (int iter = 0; iter < 1024; ++iter) { - ASSERT_EQ(IDChecker::PeekId(), 1); + ASSERT_EQ(IDChecker::PeekId(), 1u); // Another new thread, read/write should not see value from previous thread env_->StartThread(func, static_cast(&p)); mu.Lock(); @@ -141,13 +141,13 @@ TEST(ThreadLocalTest, SequentialReadWriteTest) { cv.Wait(); } mu.Unlock(); - ASSERT_EQ(IDChecker::PeekId(), 1); + ASSERT_EQ(IDChecker::PeekId(), 1u); } } TEST(ThreadLocalTest, ConcurrentReadWriteTest) { // global id list carries over 3, 1, 2, 0 - ASSERT_EQ(IDChecker::PeekId(), 0); + ASSERT_EQ(IDChecker::PeekId(), 0u); ThreadLocalPtr tls2; port::Mutex mu1; @@ -226,11 +226,11 @@ TEST(ThreadLocalTest, ConcurrentReadWriteTest) { } mu2.Unlock(); - ASSERT_EQ(IDChecker::PeekId(), 3); + ASSERT_EQ(IDChecker::PeekId(), 3u); } TEST(ThreadLocalTest, Unref) { - ASSERT_EQ(IDChecker::PeekId(), 0); + ASSERT_EQ(IDChecker::PeekId(), 0u); auto unref = [](void* ptr) { auto& p = *static_cast(ptr);