diff --git a/Makefile b/Makefile index 01dc3721d0..99892b7611 100644 --- a/Makefile +++ b/Makefile @@ -98,7 +98,9 @@ BENCHMARKS = db_bench_sqlite3 db_bench_tree_db table_reader_bench # The library name is configurable since we are maintaining libraries of both # debug/release mode. -LIBNAME = librocksdb +ifeq ($(LIBNAME),) + LIBNAME=librocksdb +endif LIBRARY = ${LIBNAME}.a MEMENVLIBRARY = libmemenv.a diff --git a/build_tools/make_new_version.sh b/build_tools/make_new_version.sh index 165ed0c026..1440a4fd61 100755 --- a/build_tools/make_new_version.sh +++ b/build_tools/make_new_version.sh @@ -16,7 +16,7 @@ function title() { echo -e "\033[1;32m$*\033[0m" } -usage="Create new rocksdb version and prepare it for the release process\n" +usage="Create new RocksDB version and prepare it for the release process\n" usage+="USAGE: ./make_new_version.sh " # -- Pre-check @@ -33,8 +33,8 @@ if [ $GIT_BRANCH != "master" ]; then fi title "Adding new tag for this release ..." -$TAG="$ROCKSDB_VERSION.fb" -$GIT tag -a "$TAG" -m "Rocksdb $ROCKSDB_VERSION" +TAG="$ROCKSDB_VERSION.fb" +$GIT tag -a "$TAG" -m "RocksDB $ROCKSDB_VERSION" # Setting up the proxy for remote repo access title "Pushing new tag to remote repo ..." diff --git a/db/db_impl.cc b/db/db_impl.cc index 32823072ae..f2c77ce36e 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -279,15 +279,9 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) purge_wal_files_last_run_(0), last_stats_dump_time_microsec_(0), default_interval_to_delete_obsolete_WAL_(600), - stall_level0_slowdown_(0), - stall_memtable_compaction_(0), - stall_level0_num_files_(0), - stall_level0_slowdown_count_(0), - stall_memtable_compaction_count_(0), - stall_level0_num_files_count_(0), - started_at_(options.env->NowMicros()), flush_on_destroy_(false), - stats_(options.num_levels), + internal_stats_(options.num_levels, options.env, + options.statistics.get()), delayed_writes_(0), storage_options_(options), bg_work_gate_closed_(false), @@ -295,13 +289,6 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) env_->GetAbsolutePath(dbname, &db_absolute_path_); - stall_leveln_slowdown_.resize(options.num_levels); - stall_leveln_slowdown_count_.resize(options.num_levels); - for (int i = 0; i < options.num_levels; ++i) { - stall_leveln_slowdown_[i] = 0; - stall_leveln_slowdown_count_[i] = 0; - } - // Reserve ten files or so for other uses and give the rest to TableCache. const int table_cache_size = options_.max_open_files - 10; table_cache_.reset(new TableCache(dbname_, &options_, @@ -1081,11 +1068,11 @@ Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) { meta.smallest_seqno, meta.largest_seqno); } - CompactionStats stats; + InternalStats::CompactionStats stats; stats.micros = env_->NowMicros() - start_micros; stats.bytes_written = meta.file_size; stats.files_out_levelnp1 = 1; - stats_[level].Add(stats); + internal_stats_.AddCompactionStats(level, stats); RecordTick(options_.statistics.get(), COMPACT_WRITE_BYTES, meta.file_size); return s; } @@ -1170,10 +1157,10 @@ Status DBImpl::WriteLevel0Table(std::vector &mems, VersionEdit* edit, meta.smallest_seqno, meta.largest_seqno); } - CompactionStats stats; + InternalStats::CompactionStats stats; stats.micros = env_->NowMicros() - start_micros; stats.bytes_written = meta.file_size; - stats_[level].Add(stats); + internal_stats_.AddCompactionStats(level, stats); RecordTick(options_.statistics.get(), COMPACT_WRITE_BYTES, meta.file_size); return s; } @@ -1185,8 +1172,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, if (!default_cfd_->imm()->IsFlushPending()) { Log(options_.info_log, "FlushMemTableToOutputFile already in progress"); - Status s = Status::IOError("FlushMemTableToOutputFile already in progress"); - return s; + return Status::IOError("FlushMemTableToOutputFile already in progress"); } // Save the contents of the earliest memtable as a new Table @@ -1195,8 +1181,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, default_cfd_->imm()->PickMemtablesToFlush(&mems); if (mems.empty()) { Log(options_.info_log, "Nothing in memstore to flush"); - Status s = Status::IOError("Nothing in memstore to flush"); - return s; + return Status::IOError("Nothing in memstore to flush"); } // record the logfile_number_ before we release the mutex @@ -1879,6 +1864,13 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, *madeProgress = false; mutex_.AssertHeld(); + bool is_manual = (manual_compaction_ != nullptr) && + (manual_compaction_->in_progress == false); + if (is_manual) { + // another thread cannot pick up the same work + manual_compaction_->in_progress = true; + } + // TODO: remove memtable flush from formal compaction while (default_cfd_->imm()->IsFlushPending()) { Log(options_.info_log, @@ -1887,19 +1879,22 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, options_.max_background_compactions - bg_compaction_scheduled_); Status stat = FlushMemTableToOutputFile(madeProgress, deletion_state); if (!stat.ok()) { + if (is_manual) { + manual_compaction_->status = stat; + manual_compaction_->done = true; + manual_compaction_->in_progress = false; + manual_compaction_ = nullptr; + } return stat; } } unique_ptr c; - bool is_manual = (manual_compaction_ != nullptr) && - (manual_compaction_->in_progress == false); InternalKey manual_end_storage; InternalKey* manual_end = &manual_end_storage; if (is_manual) { ManualCompaction* m = manual_compaction_; - assert(!m->in_progress); - m->in_progress = true; // another thread cannot pick up the same work + assert(m->in_progress); c.reset(versions_->CompactRange( m->input_level, m->output_level, m->begin, m->end, &manual_end)); if (!c) { @@ -2559,7 +2554,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, if (!options_.disableDataSync) { db_directory_->Fsync(); } - CompactionStats stats; + + InternalStats::CompactionStats stats; stats.micros = env_->NowMicros() - start_micros - imm_micros; MeasureTime(options_.statistics.get(), COMPACTION_TIME, stats.micros); stats.files_in_leveln = compact->compaction->num_input_files(0); @@ -2593,7 +2589,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, LogFlush(options_.info_log); mutex_.Lock(); - stats_[compact->compaction->output_level()].Add(stats); + internal_stats_.AddCompactionStats(compact->compaction->output_level(), + stats); // if there were any unused file number (mostly in case of // compaction error), free up the entry from pending_putputs @@ -3334,8 +3331,7 @@ Status DBImpl::MakeRoomForWrite(bool force, delayed = sw.ElapsedMicros(); } RecordTick(options_.statistics.get(), STALL_L0_SLOWDOWN_MICROS, delayed); - stall_level0_slowdown_ += delayed; - stall_level0_slowdown_count_++; + internal_stats_.RecordWriteStall(InternalStats::LEVEL0_SLOWDOWN, delayed); allow_delay = false; // Do not delay a single write more than once mutex_.Lock(); delayed_writes_++; @@ -3361,8 +3357,8 @@ Status DBImpl::MakeRoomForWrite(bool force, } RecordTick(options_.statistics.get(), STALL_MEMTABLE_COMPACTION_MICROS, stall); - stall_memtable_compaction_ += stall; - stall_memtable_compaction_count_++; + internal_stats_.RecordWriteStall(InternalStats::MEMTABLE_COMPACTION, + stall); } else if (default_cfd_->current()->NumLevelFiles(0) >= options_.level0_stop_writes_trigger) { // There are too many level-0 files. @@ -3376,8 +3372,7 @@ Status DBImpl::MakeRoomForWrite(bool force, stall = sw.ElapsedMicros(); } RecordTick(options_.statistics.get(), STALL_L0_NUM_FILES_MICROS, stall); - stall_level0_num_files_ += stall; - stall_level0_num_files_count_++; + internal_stats_.RecordWriteStall(InternalStats::LEVEL0_NUM_FILES, stall); } else if (allow_hard_rate_limit_delay && options_.hard_rate_limit > 1.0 && (score = default_cfd_->current()->MaxCompactionScore()) > options_.hard_rate_limit) { @@ -3391,8 +3386,7 @@ Status DBImpl::MakeRoomForWrite(bool force, env_->SleepForMicroseconds(1000); delayed = sw.ElapsedMicros(); } - stall_leveln_slowdown_[max_level] += delayed; - stall_leveln_slowdown_count_[max_level]++; + internal_stats_.RecordLevelNSlowdown(max_level, delayed); // Make sure the following value doesn't round to zero. uint64_t rate_limit = std::max((delayed / 1000), (uint64_t) 1); rate_limit_delay_millis += rate_limit; @@ -3491,297 +3485,10 @@ const Options& DBImpl::GetOptions(const ColumnFamilyHandle& column_family) bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family, const Slice& property, std::string* value) { value->clear(); - MutexLock l(&mutex_); - Version* current = default_cfd_->current(); - Slice in = property; - Slice prefix("rocksdb."); - if (!in.starts_with(prefix)) return false; - in.remove_prefix(prefix.size()); - - if (in.starts_with("num-files-at-level")) { - in.remove_prefix(strlen("num-files-at-level")); - uint64_t level; - bool ok = ConsumeDecimalNumber(&in, &level) && in.empty(); - if (!ok || (int)level >= NumberLevels()) { - return false; - } else { - char buf[100]; - snprintf(buf, sizeof(buf), "%d", - current->NumLevelFiles(static_cast(level))); - *value = buf; - return true; - } - } else if (in == "levelstats") { - char buf[1000]; - snprintf(buf, sizeof(buf), - "Level Files Size(MB)\n" - "--------------------\n"); - value->append(buf); - - for (int level = 0; level < NumberLevels(); level++) { - snprintf(buf, sizeof(buf), - "%3d %8d %8.0f\n", - level, - current->NumLevelFiles(level), - current->NumLevelBytes(level) / 1048576.0); - value->append(buf); - } - return true; - - } else if (in == "stats") { - char buf[1000]; - - uint64_t wal_bytes = 0; - uint64_t wal_synced = 0; - uint64_t user_bytes_written = 0; - uint64_t write_other = 0; - uint64_t write_self = 0; - uint64_t write_with_wal = 0; - uint64_t total_bytes_written = 0; - uint64_t total_bytes_read = 0; - uint64_t micros_up = env_->NowMicros() - started_at_; - // Add "+1" to make sure seconds_up is > 0 and avoid NaN later - double seconds_up = (micros_up + 1) / 1000000.0; - uint64_t total_slowdown = 0; - uint64_t total_slowdown_count = 0; - uint64_t interval_bytes_written = 0; - uint64_t interval_bytes_read = 0; - uint64_t interval_bytes_new = 0; - double interval_seconds_up = 0; - - Statistics* s = options_.statistics.get(); - if (s) { - wal_bytes = s->getTickerCount(WAL_FILE_BYTES); - wal_synced = s->getTickerCount(WAL_FILE_SYNCED); - user_bytes_written = s->getTickerCount(BYTES_WRITTEN); - write_other = s->getTickerCount(WRITE_DONE_BY_OTHER); - write_self = s->getTickerCount(WRITE_DONE_BY_SELF); - write_with_wal = s->getTickerCount(WRITE_WITH_WAL); - } - - // Pardon the long line but I think it is easier to read this way. - snprintf(buf, sizeof(buf), - " Compactions\n" - "Level Files Size(MB) Score Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) RW-Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count msComp msStall Ln-stall Stall-cnt\n" - "------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------\n" - ); - value->append(buf); - for (int level = 0; level < current->NumberLevels(); level++) { - int files = current->NumLevelFiles(level); - if (stats_[level].micros > 0 || files > 0) { - int64_t bytes_read = stats_[level].bytes_readn + - stats_[level].bytes_readnp1; - int64_t bytes_new = stats_[level].bytes_written - - stats_[level].bytes_readnp1; - double amplify = (stats_[level].bytes_readn == 0) - ? 0.0 - : (stats_[level].bytes_written + - stats_[level].bytes_readnp1 + - stats_[level].bytes_readn) / - (double) stats_[level].bytes_readn; - - total_bytes_read += bytes_read; - total_bytes_written += stats_[level].bytes_written; - - uint64_t stalls = level == 0 ? - (stall_level0_slowdown_count_ + - stall_level0_num_files_count_ + - stall_memtable_compaction_count_) : - stall_leveln_slowdown_count_[level]; - - double stall_us = level == 0 ? - (stall_level0_slowdown_ + - stall_level0_num_files_ + - stall_memtable_compaction_) : - stall_leveln_slowdown_[level]; - - snprintf( - buf, sizeof(buf), - "%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f %10.1f %9.1f %11.1f %8d %8d %8d %8d %8d %8d %9.1f %9.1f %9lu\n", - level, - files, - current->NumLevelBytes(level) / 1048576.0, - current->NumLevelBytes(level) / - versions_->MaxBytesForLevel(level), - stats_[level].micros / 1e6, - bytes_read / 1048576.0, - stats_[level].bytes_written / 1048576.0, - stats_[level].bytes_readn / 1048576.0, - stats_[level].bytes_readnp1 / 1048576.0, - bytes_new / 1048576.0, - amplify, - // +1 to avoid division by 0 - (bytes_read / 1048576.0) / ((stats_[level].micros+1) / 1000000.0), - (stats_[level].bytes_written / 1048576.0) / - ((stats_[level].micros+1) / 1000000.0), - stats_[level].files_in_leveln, - stats_[level].files_in_levelnp1, - stats_[level].files_out_levelnp1, - stats_[level].files_out_levelnp1 - stats_[level].files_in_levelnp1, - stats_[level].count, - (int) ((double) stats_[level].micros / - 1000.0 / - (stats_[level].count + 1)), - (double) stall_us / 1000.0 / (stalls + 1), - stall_us / 1000000.0, - (unsigned long) stalls); - - total_slowdown += stall_leveln_slowdown_[level]; - total_slowdown_count += stall_leveln_slowdown_count_[level]; - value->append(buf); - } - } - - interval_bytes_new = user_bytes_written - last_stats_.ingest_bytes_; - interval_bytes_read = total_bytes_read - last_stats_.compaction_bytes_read_; - interval_bytes_written = - total_bytes_written - last_stats_.compaction_bytes_written_; - interval_seconds_up = seconds_up - last_stats_.seconds_up_; - - snprintf(buf, sizeof(buf), "Uptime(secs): %.1f total, %.1f interval\n", - seconds_up, interval_seconds_up); - value->append(buf); - - snprintf(buf, sizeof(buf), - "Writes cumulative: %llu total, %llu batches, " - "%.1f per batch, %.2f ingest GB\n", - (unsigned long long) (write_other + write_self), - (unsigned long long) write_self, - (write_other + write_self) / (double) (write_self + 1), - user_bytes_written / (1048576.0 * 1024)); - value->append(buf); - - snprintf(buf, sizeof(buf), - "WAL cumulative: %llu WAL writes, %llu WAL syncs, " - "%.2f writes per sync, %.2f GB written\n", - (unsigned long long) write_with_wal, - (unsigned long long ) wal_synced, - write_with_wal / (double) (wal_synced + 1), - wal_bytes / (1048576.0 * 1024)); - value->append(buf); - - snprintf(buf, sizeof(buf), - "Compaction IO cumulative (GB): " - "%.2f new, %.2f read, %.2f write, %.2f read+write\n", - user_bytes_written / (1048576.0 * 1024), - total_bytes_read / (1048576.0 * 1024), - total_bytes_written / (1048576.0 * 1024), - (total_bytes_read + total_bytes_written) / (1048576.0 * 1024)); - value->append(buf); - - snprintf(buf, sizeof(buf), - "Compaction IO cumulative (MB/sec): " - "%.1f new, %.1f read, %.1f write, %.1f read+write\n", - user_bytes_written / 1048576.0 / seconds_up, - total_bytes_read / 1048576.0 / seconds_up, - total_bytes_written / 1048576.0 / seconds_up, - (total_bytes_read + total_bytes_written) / 1048576.0 / seconds_up); - value->append(buf); - - // +1 to avoid divide by 0 and NaN - snprintf(buf, sizeof(buf), - "Amplification cumulative: %.1f write, %.1f compaction\n", - (double) (total_bytes_written + wal_bytes) - / (user_bytes_written + 1), - (double) (total_bytes_written + total_bytes_read + wal_bytes) - / (user_bytes_written + 1)); - value->append(buf); - - uint64_t interval_write_other = write_other - last_stats_.write_other_; - uint64_t interval_write_self = write_self - last_stats_.write_self_; - - snprintf(buf, sizeof(buf), - "Writes interval: %llu total, %llu batches, " - "%.1f per batch, %.1f ingest MB\n", - (unsigned long long) (interval_write_other + interval_write_self), - (unsigned long long) interval_write_self, - (double) (interval_write_other + interval_write_self) - / (interval_write_self + 1), - (user_bytes_written - last_stats_.ingest_bytes_) / 1048576.0); - value->append(buf); - - uint64_t interval_write_with_wal = - write_with_wal - last_stats_.write_with_wal_; - - uint64_t interval_wal_synced = wal_synced - last_stats_.wal_synced_; - uint64_t interval_wal_bytes = wal_bytes - last_stats_.wal_bytes_; - - snprintf(buf, sizeof(buf), - "WAL interval: %llu WAL writes, %llu WAL syncs, " - "%.2f writes per sync, %.2f MB written\n", - (unsigned long long) interval_write_with_wal, - (unsigned long long ) interval_wal_synced, - interval_write_with_wal / (double) (interval_wal_synced + 1), - interval_wal_bytes / (1048576.0 * 1024)); - value->append(buf); - - snprintf(buf, sizeof(buf), - "Compaction IO interval (MB): " - "%.2f new, %.2f read, %.2f write, %.2f read+write\n", - interval_bytes_new / 1048576.0, - interval_bytes_read/ 1048576.0, - interval_bytes_written / 1048576.0, - (interval_bytes_read + interval_bytes_written) / 1048576.0); - value->append(buf); - - snprintf(buf, sizeof(buf), - "Compaction IO interval (MB/sec): " - "%.1f new, %.1f read, %.1f write, %.1f read+write\n", - interval_bytes_new / 1048576.0 / interval_seconds_up, - interval_bytes_read / 1048576.0 / interval_seconds_up, - interval_bytes_written / 1048576.0 / interval_seconds_up, - (interval_bytes_read + interval_bytes_written) - / 1048576.0 / interval_seconds_up); - value->append(buf); - - // +1 to avoid divide by 0 and NaN - snprintf(buf, sizeof(buf), - "Amplification interval: %.1f write, %.1f compaction\n", - (double) (interval_bytes_written + wal_bytes) - / (interval_bytes_new + 1), - (double) (interval_bytes_written + interval_bytes_read + wal_bytes) - / (interval_bytes_new + 1)); - value->append(buf); - - snprintf(buf, sizeof(buf), - "Stalls(secs): %.3f level0_slowdown, %.3f level0_numfiles, " - "%.3f memtable_compaction, %.3f leveln_slowdown\n", - stall_level0_slowdown_ / 1000000.0, - stall_level0_num_files_ / 1000000.0, - stall_memtable_compaction_ / 1000000.0, - total_slowdown / 1000000.0); - value->append(buf); - - snprintf(buf, sizeof(buf), - "Stalls(count): %lu level0_slowdown, %lu level0_numfiles, " - "%lu memtable_compaction, %lu leveln_slowdown\n", - (unsigned long) stall_level0_slowdown_count_, - (unsigned long) stall_level0_num_files_count_, - (unsigned long) stall_memtable_compaction_count_, - (unsigned long) total_slowdown_count); - value->append(buf); - - last_stats_.compaction_bytes_read_ = total_bytes_read; - last_stats_.compaction_bytes_written_ = total_bytes_written; - last_stats_.ingest_bytes_ = user_bytes_written; - last_stats_.seconds_up_ = seconds_up; - last_stats_.wal_bytes_ = wal_bytes; - last_stats_.wal_synced_ = wal_synced; - last_stats_.write_with_wal_ = write_with_wal; - last_stats_.write_other_ = write_other; - last_stats_.write_self_ = write_self; - - return true; - } else if (in == "sstables") { - *value = default_cfd_->current()->DebugString(); - return true; - } else if (in == "num-immutable-mem-table") { - *value = std::to_string(default_cfd_->imm()->size()); - return true; - } - - return false; + return internal_stats_.GetProperty(property, value, versions_.get(), + default_cfd_->current(), + default_cfd_->imm()->size()); } void DBImpl::GetApproximateSizes(const ColumnFamilyHandle& column_family, diff --git a/db/db_impl.h b/db/db_impl.h index b237a4e891..68bfa84ead 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -25,6 +25,7 @@ #include "util/stats_logger.h" #include "memtablelist.h" #include "util/autovector.h" +#include "db/internal_stats.h" namespace rocksdb { @@ -470,88 +471,9 @@ class DBImpl : public DB { // enabled and archive size_limit is disabled. uint64_t default_interval_to_delete_obsolete_WAL_; - // These count the number of microseconds for which MakeRoomForWrite stalls. - uint64_t stall_level0_slowdown_; - uint64_t stall_memtable_compaction_; - uint64_t stall_level0_num_files_; - std::vector stall_leveln_slowdown_; - uint64_t stall_level0_slowdown_count_; - uint64_t stall_memtable_compaction_count_; - uint64_t stall_level0_num_files_count_; - std::vector stall_leveln_slowdown_count_; - - // Time at which this instance was started. - const uint64_t started_at_; - bool flush_on_destroy_; // Used when disableWAL is true. - // Per level compaction stats. stats_[level] stores the stats for - // compactions that produced data for the specified "level". - struct CompactionStats { - uint64_t micros; - - // Bytes read from level N during compaction between levels N and N+1 - int64_t bytes_readn; - - // Bytes read from level N+1 during compaction between levels N and N+1 - int64_t bytes_readnp1; - - // Total bytes written during compaction between levels N and N+1 - int64_t bytes_written; - - // Files read from level N during compaction between levels N and N+1 - int files_in_leveln; - - // Files read from level N+1 during compaction between levels N and N+1 - int files_in_levelnp1; - - // Files written during compaction between levels N and N+1 - int files_out_levelnp1; - - // Number of compactions done - int count; - - CompactionStats() : micros(0), bytes_readn(0), bytes_readnp1(0), - bytes_written(0), files_in_leveln(0), - files_in_levelnp1(0), files_out_levelnp1(0), - count(0) { } - - void Add(const CompactionStats& c) { - this->micros += c.micros; - this->bytes_readn += c.bytes_readn; - this->bytes_readnp1 += c.bytes_readnp1; - this->bytes_written += c.bytes_written; - this->files_in_leveln += c.files_in_leveln; - this->files_in_levelnp1 += c.files_in_levelnp1; - this->files_out_levelnp1 += c.files_out_levelnp1; - this->count += 1; - } - }; - - std::vector stats_; - - // Used to compute per-interval statistics - struct StatsSnapshot { - uint64_t compaction_bytes_read_; // Bytes read by compaction - uint64_t compaction_bytes_written_; // Bytes written by compaction - uint64_t ingest_bytes_; // Bytes written by user - uint64_t wal_bytes_; // Bytes written to WAL - uint64_t wal_synced_; // Number of times WAL is synced - uint64_t write_with_wal_; // Number of writes that request WAL - // These count the number of writes processed by the calling thread or - // another thread. - uint64_t write_other_; - uint64_t write_self_; - double seconds_up_; - - StatsSnapshot() : compaction_bytes_read_(0), compaction_bytes_written_(0), - ingest_bytes_(0), wal_bytes_(0), wal_synced_(0), - write_with_wal_(0), write_other_(0), write_self_(0), - seconds_up_(0) {} - }; - - // Counters from the previous time per-interval stats were computed - StatsSnapshot last_stats_; + InternalStats internal_stats_; static const int KEEP_LOG_FILE_NUM = 1000; std::string db_absolute_path_; diff --git a/db/internal_stats.cc b/db/internal_stats.cc new file mode 100644 index 0000000000..d946e6f270 --- /dev/null +++ b/db/internal_stats.cc @@ -0,0 +1,298 @@ + +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/internal_stats.h" + +#include + +namespace rocksdb { + +bool InternalStats::GetProperty(const Slice& property, std::string* value, + VersionSet* version_set, Version* current, + int immsize) { + Slice in = property; + Slice prefix("rocksdb."); + if (!in.starts_with(prefix)) return false; + in.remove_prefix(prefix.size()); + + if (in.starts_with("num-files-at-level")) { + in.remove_prefix(strlen("num-files-at-level")); + uint64_t level; + bool ok = ConsumeDecimalNumber(&in, &level) && in.empty(); + if (!ok || (int)level >= number_levels_) { + return false; + } else { + char buf[100]; + snprintf(buf, sizeof(buf), "%d", + current->NumLevelFiles(static_cast(level))); + *value = buf; + return true; + } + } else if (in == "levelstats") { + char buf[1000]; + snprintf(buf, sizeof(buf), + "Level Files Size(MB)\n" + "--------------------\n"); + value->append(buf); + + for (int level = 0; level < number_levels_; level++) { + snprintf(buf, sizeof(buf), "%3d %8d %8.0f\n", level, + current->NumLevelFiles(level), + current->NumLevelBytes(level) / 1048576.0); + value->append(buf); + } + return true; + + } else if (in == "stats") { + char buf[1000]; + + uint64_t wal_bytes = 0; + uint64_t wal_synced = 0; + uint64_t user_bytes_written = 0; + uint64_t write_other = 0; + uint64_t write_self = 0; + uint64_t write_with_wal = 0; + uint64_t total_bytes_written = 0; + uint64_t total_bytes_read = 0; + uint64_t micros_up = env_->NowMicros() - started_at_; + // Add "+1" to make sure seconds_up is > 0 and avoid NaN later + double seconds_up = (micros_up + 1) / 1000000.0; + uint64_t total_slowdown = 0; + uint64_t total_slowdown_count = 0; + uint64_t interval_bytes_written = 0; + uint64_t interval_bytes_read = 0; + uint64_t interval_bytes_new = 0; + double interval_seconds_up = 0; + + if (statistics_) { + wal_bytes = statistics_->getTickerCount(WAL_FILE_BYTES); + wal_synced = statistics_->getTickerCount(WAL_FILE_SYNCED); + user_bytes_written = statistics_->getTickerCount(BYTES_WRITTEN); + write_other = statistics_->getTickerCount(WRITE_DONE_BY_OTHER); + write_self = statistics_->getTickerCount(WRITE_DONE_BY_SELF); + write_with_wal = statistics_->getTickerCount(WRITE_WITH_WAL); + } + + // Pardon the long line but I think it is easier to read this way. + snprintf(buf, sizeof(buf), + " Compactions\n" + "Level Files Size(MB) Score Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) RW-Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count msComp msStall Ln-stall Stall-cnt\n" + "------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------\n" + ); + value->append(buf); + for (int level = 0; level < number_levels_; level++) { + int files = current->NumLevelFiles(level); + if (compaction_stats_[level].micros > 0 || files > 0) { + int64_t bytes_read = compaction_stats_[level].bytes_readn + + compaction_stats_[level].bytes_readnp1; + int64_t bytes_new = compaction_stats_[level].bytes_written - + compaction_stats_[level].bytes_readnp1; + double amplify = (compaction_stats_[level].bytes_readn == 0) + ? 0.0 + : (compaction_stats_[level].bytes_written + + compaction_stats_[level].bytes_readnp1 + + compaction_stats_[level].bytes_readn) / + (double)compaction_stats_[level].bytes_readn; + + total_bytes_read += bytes_read; + total_bytes_written += compaction_stats_[level].bytes_written; + + uint64_t stalls = level == 0 ? (stall_counts_[LEVEL0_SLOWDOWN] + + stall_counts_[LEVEL0_NUM_FILES] + + stall_counts_[MEMTABLE_COMPACTION]) + : stall_leveln_slowdown_count_[level]; + + double stall_us = level == 0 ? (stall_micros_[LEVEL0_SLOWDOWN] + + stall_micros_[LEVEL0_NUM_FILES] + + stall_micros_[MEMTABLE_COMPACTION]) + : stall_leveln_slowdown_[level]; + + snprintf(buf, sizeof(buf), + "%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f " + "%10.1f %9.1f %11.1f %8d %8d %8d %8d %8d %8d %9.1f %9.1f " + "%9lu\n", + level, files, current->NumLevelBytes(level) / 1048576.0, + current->NumLevelBytes(level) / + version_set->MaxBytesForLevel(level), + compaction_stats_[level].micros / 1e6, bytes_read / 1048576.0, + compaction_stats_[level].bytes_written / 1048576.0, + compaction_stats_[level].bytes_readn / 1048576.0, + compaction_stats_[level].bytes_readnp1 / 1048576.0, + bytes_new / 1048576.0, amplify, + // +1 to avoid division by 0 + (bytes_read / 1048576.0) / + ((compaction_stats_[level].micros + 1) / 1000000.0), + (compaction_stats_[level].bytes_written / 1048576.0) / + ((compaction_stats_[level].micros + 1) / 1000000.0), + compaction_stats_[level].files_in_leveln, + compaction_stats_[level].files_in_levelnp1, + compaction_stats_[level].files_out_levelnp1, + compaction_stats_[level].files_out_levelnp1 - + compaction_stats_[level].files_in_levelnp1, + compaction_stats_[level].count, + (int)((double)compaction_stats_[level].micros / 1000.0 / + (compaction_stats_[level].count + 1)), + (double)stall_us / 1000.0 / (stalls + 1), stall_us / 1000000.0, + (unsigned long)stalls); + total_slowdown += stall_leveln_slowdown_[level]; + total_slowdown_count += stall_leveln_slowdown_count_[level]; + value->append(buf); + } + } + + interval_bytes_new = user_bytes_written - last_stats_.ingest_bytes_; + interval_bytes_read = total_bytes_read - last_stats_.compaction_bytes_read_; + interval_bytes_written = + total_bytes_written - last_stats_.compaction_bytes_written_; + interval_seconds_up = seconds_up - last_stats_.seconds_up_; + + snprintf(buf, sizeof(buf), "Uptime(secs): %.1f total, %.1f interval\n", + seconds_up, interval_seconds_up); + value->append(buf); + + snprintf(buf, sizeof(buf), + "Writes cumulative: %llu total, %llu batches, " + "%.1f per batch, %.2f ingest GB\n", + (unsigned long long)(write_other + write_self), + (unsigned long long)write_self, + (write_other + write_self) / (double)(write_self + 1), + user_bytes_written / (1048576.0 * 1024)); + value->append(buf); + + snprintf(buf, sizeof(buf), + "WAL cumulative: %llu WAL writes, %llu WAL syncs, " + "%.2f writes per sync, %.2f GB written\n", + (unsigned long long)write_with_wal, (unsigned long long)wal_synced, + write_with_wal / (double)(wal_synced + 1), + wal_bytes / (1048576.0 * 1024)); + value->append(buf); + + snprintf(buf, sizeof(buf), + "Compaction IO cumulative (GB): " + "%.2f new, %.2f read, %.2f write, %.2f read+write\n", + user_bytes_written / (1048576.0 * 1024), + total_bytes_read / (1048576.0 * 1024), + total_bytes_written / (1048576.0 * 1024), + (total_bytes_read + total_bytes_written) / (1048576.0 * 1024)); + value->append(buf); + + snprintf(buf, sizeof(buf), + "Compaction IO cumulative (MB/sec): " + "%.1f new, %.1f read, %.1f write, %.1f read+write\n", + user_bytes_written / 1048576.0 / seconds_up, + total_bytes_read / 1048576.0 / seconds_up, + total_bytes_written / 1048576.0 / seconds_up, + (total_bytes_read + total_bytes_written) / 1048576.0 / seconds_up); + value->append(buf); + + // +1 to avoid divide by 0 and NaN + snprintf( + buf, sizeof(buf), + "Amplification cumulative: %.1f write, %.1f compaction\n", + (double)(total_bytes_written + wal_bytes) / (user_bytes_written + 1), + (double)(total_bytes_written + total_bytes_read + wal_bytes) / + (user_bytes_written + 1)); + value->append(buf); + + uint64_t interval_write_other = write_other - last_stats_.write_other_; + uint64_t interval_write_self = write_self - last_stats_.write_self_; + + snprintf(buf, sizeof(buf), + "Writes interval: %llu total, %llu batches, " + "%.1f per batch, %.1f ingest MB\n", + (unsigned long long)(interval_write_other + interval_write_self), + (unsigned long long)interval_write_self, + (double)(interval_write_other + interval_write_self) / + (interval_write_self + 1), + (user_bytes_written - last_stats_.ingest_bytes_) / 1048576.0); + value->append(buf); + + uint64_t interval_write_with_wal = + write_with_wal - last_stats_.write_with_wal_; + + uint64_t interval_wal_synced = wal_synced - last_stats_.wal_synced_; + uint64_t interval_wal_bytes = wal_bytes - last_stats_.wal_bytes_; + + snprintf(buf, sizeof(buf), + "WAL interval: %llu WAL writes, %llu WAL syncs, " + "%.2f writes per sync, %.2f MB written\n", + (unsigned long long)interval_write_with_wal, + (unsigned long long)interval_wal_synced, + interval_write_with_wal / (double)(interval_wal_synced + 1), + interval_wal_bytes / (1048576.0 * 1024)); + value->append(buf); + + snprintf(buf, sizeof(buf), + "Compaction IO interval (MB): " + "%.2f new, %.2f read, %.2f write, %.2f read+write\n", + interval_bytes_new / 1048576.0, interval_bytes_read / 1048576.0, + interval_bytes_written / 1048576.0, + (interval_bytes_read + interval_bytes_written) / 1048576.0); + value->append(buf); + + snprintf(buf, sizeof(buf), + "Compaction IO interval (MB/sec): " + "%.1f new, %.1f read, %.1f write, %.1f read+write\n", + interval_bytes_new / 1048576.0 / interval_seconds_up, + interval_bytes_read / 1048576.0 / interval_seconds_up, + interval_bytes_written / 1048576.0 / interval_seconds_up, + (interval_bytes_read + interval_bytes_written) / 1048576.0 / + interval_seconds_up); + value->append(buf); + + // +1 to avoid divide by 0 and NaN + snprintf( + buf, sizeof(buf), + "Amplification interval: %.1f write, %.1f compaction\n", + (double)(interval_bytes_written + wal_bytes) / (interval_bytes_new + 1), + (double)(interval_bytes_written + interval_bytes_read + wal_bytes) / + (interval_bytes_new + 1)); + value->append(buf); + + snprintf(buf, sizeof(buf), + "Stalls(secs): %.3f level0_slowdown, %.3f level0_numfiles, " + "%.3f memtable_compaction, %.3f leveln_slowdown\n", + stall_micros_[LEVEL0_SLOWDOWN] / 1000000.0, + stall_micros_[LEVEL0_NUM_FILES] / 1000000.0, + stall_micros_[MEMTABLE_COMPACTION] / 1000000.0, + total_slowdown / 1000000.0); + value->append(buf); + + snprintf(buf, sizeof(buf), + "Stalls(count): %lu level0_slowdown, %lu level0_numfiles, " + "%lu memtable_compaction, %lu leveln_slowdown\n", + (unsigned long)stall_counts_[LEVEL0_SLOWDOWN], + (unsigned long)stall_counts_[LEVEL0_NUM_FILES], + (unsigned long)stall_counts_[MEMTABLE_COMPACTION], + (unsigned long)total_slowdown_count); + value->append(buf); + + last_stats_.compaction_bytes_read_ = total_bytes_read; + last_stats_.compaction_bytes_written_ = total_bytes_written; + last_stats_.ingest_bytes_ = user_bytes_written; + last_stats_.seconds_up_ = seconds_up; + last_stats_.wal_bytes_ = wal_bytes; + last_stats_.wal_synced_ = wal_synced; + last_stats_.write_with_wal_ = write_with_wal; + last_stats_.write_other_ = write_other; + last_stats_.write_self_ = write_self; + + return true; + } else if (in == "sstables") { + *value = current->DebugString(); + return true; + } else if (in == "num-immutable-mem-table") { + *value = std::to_string(immsize); + return true; + } + + return false; +} + +} // namespace rocksdb diff --git a/db/internal_stats.h b/db/internal_stats.h new file mode 100644 index 0000000000..ca49294e4c --- /dev/null +++ b/db/internal_stats.h @@ -0,0 +1,149 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// + +#pragma once +#include "rocksdb/statistics.h" +#include "util/statistics.h" +#include "db/version_set.h" + +#include +#include + +namespace rocksdb { +class InternalStats { + public: + enum WriteStallType { + LEVEL0_SLOWDOWN, + MEMTABLE_COMPACTION, + LEVEL0_NUM_FILES, + WRITE_STALLS_ENUM_MAX, + }; + + InternalStats(int num_levels, Env* env, Statistics* statistics) + : compaction_stats_(num_levels), + stall_micros_(WRITE_STALLS_ENUM_MAX, 0), + stall_counts_(WRITE_STALLS_ENUM_MAX, 0), + stall_leveln_slowdown_(num_levels, 0), + stall_leveln_slowdown_count_(num_levels, 0), + number_levels_(num_levels), + statistics_(statistics), + env_(env), + started_at_(env->NowMicros()) {} + + // Per level compaction stats. compaction_stats_[level] stores the stats for + // compactions that produced data for the specified "level". + struct CompactionStats { + uint64_t micros; + + // Bytes read from level N during compaction between levels N and N+1 + int64_t bytes_readn; + + // Bytes read from level N+1 during compaction between levels N and N+1 + int64_t bytes_readnp1; + + // Total bytes written during compaction between levels N and N+1 + int64_t bytes_written; + + // Files read from level N during compaction between levels N and N+1 + int files_in_leveln; + + // Files read from level N+1 during compaction between levels N and N+1 + int files_in_levelnp1; + + // Files written during compaction between levels N and N+1 + int files_out_levelnp1; + + // Number of compactions done + int count; + + CompactionStats() + : micros(0), + bytes_readn(0), + bytes_readnp1(0), + bytes_written(0), + files_in_leveln(0), + files_in_levelnp1(0), + files_out_levelnp1(0), + count(0) {} + + void Add(const CompactionStats& c) { + this->micros += c.micros; + this->bytes_readn += c.bytes_readn; + this->bytes_readnp1 += c.bytes_readnp1; + this->bytes_written += c.bytes_written; + this->files_in_leveln += c.files_in_leveln; + this->files_in_levelnp1 += c.files_in_levelnp1; + this->files_out_levelnp1 += c.files_out_levelnp1; + this->count += 1; + } + }; + + void AddCompactionStats(int level, const CompactionStats& stats) { + compaction_stats_[level].Add(stats); + } + + void RecordWriteStall(WriteStallType write_stall_type, uint64_t micros) { + stall_micros_[write_stall_type] += micros; + stall_counts_[write_stall_type]++; + } + + void RecordLevelNSlowdown(int level, uint64_t micros) { + stall_leveln_slowdown_[level] += micros; + stall_leveln_slowdown_count_[level] += micros; + } + + bool GetProperty(const Slice& property, std::string* value, + VersionSet* version_set, Version* current, int immsize); + + private: + std::vector compaction_stats_; + + // Used to compute per-interval statistics + struct StatsSnapshot { + uint64_t compaction_bytes_read_; // Bytes read by compaction + uint64_t compaction_bytes_written_; // Bytes written by compaction + uint64_t ingest_bytes_; // Bytes written by user + uint64_t wal_bytes_; // Bytes written to WAL + uint64_t wal_synced_; // Number of times WAL is synced + uint64_t write_with_wal_; // Number of writes that request WAL + // These count the number of writes processed by the calling thread or + // another thread. + uint64_t write_other_; + uint64_t write_self_; + double seconds_up_; + + StatsSnapshot() + : compaction_bytes_read_(0), + compaction_bytes_written_(0), + ingest_bytes_(0), + wal_bytes_(0), + wal_synced_(0), + write_with_wal_(0), + write_other_(0), + write_self_(0), + seconds_up_(0) {} + }; + + // Counters from the previous time per-interval stats were computed + StatsSnapshot last_stats_; + + // These count the number of microseconds for which MakeRoomForWrite stalls. + std::vector stall_micros_; + std::vector stall_counts_; + std::vector stall_leveln_slowdown_; + std::vector stall_leveln_slowdown_count_; + + int number_levels_; + Statistics* statistics_; + Env* env_; + uint64_t started_at_; +}; + +} // namespace rocksdb diff --git a/util/statistics.cc b/util/statistics.cc index f86eb2c543..4fc2400185 100644 --- a/util/statistics.cc +++ b/util/statistics.cc @@ -14,25 +14,23 @@ std::shared_ptr CreateDBStatistics() { return std::make_shared(); } -StatisticsImpl::StatisticsImpl() - : tickers_(TICKER_ENUM_MAX), - histograms_(HISTOGRAM_ENUM_MAX) {} +StatisticsImpl::StatisticsImpl() {} StatisticsImpl::~StatisticsImpl() {} long StatisticsImpl::getTickerCount(Tickers tickerType) { assert(tickerType < TICKER_ENUM_MAX); - return tickers_[tickerType]; + return tickers_[tickerType].value; } void StatisticsImpl::setTickerCount(Tickers tickerType, uint64_t count) { assert(tickerType < TICKER_ENUM_MAX); - tickers_[tickerType] = count; + tickers_[tickerType].value = count; } void StatisticsImpl::recordTick(Tickers tickerType, uint64_t count) { assert(tickerType < TICKER_ENUM_MAX); - tickers_[tickerType] += count; + tickers_[tickerType].value += count; } void StatisticsImpl::measureTime(Histograms histogramType, uint64_t value) { diff --git a/util/statistics.h b/util/statistics.h index f598bdbf90..d8cb36e0a6 100644 --- a/util/statistics.h +++ b/util/statistics.h @@ -28,8 +28,18 @@ class StatisticsImpl : public Statistics { HistogramData* const data); private: - std::vector tickers_; - std::vector histograms_; + struct Ticker { + Ticker() : value(uint_fast64_t()) {} + + std::atomic_uint_fast64_t value; + // Pad the structure to make it size of 64 bytes. A plain array of + // std::atomic_uint_fast64_t results in huge performance degradataion + // due to false sharing. + char padding[64 - sizeof(std::atomic_uint_fast64_t)]; + }; + + Ticker tickers_[TICKER_ENUM_MAX] __attribute__((aligned(64))); + HistogramImpl histograms_[HISTOGRAM_ENUM_MAX] __attribute__((aligned(64))); }; // Utility functions