diff --git a/include/utilities/backupable_db.h b/include/utilities/backupable_db.h index 1ec9e89662..8f85e06147 100644 --- a/include/utilities/backupable_db.h +++ b/include/utilities/backupable_db.h @@ -61,6 +61,16 @@ struct BackupableDBOptions { // Default: true bool backup_log_files; + // Max bytes that can be transferred in a second during backup. + // If 0, go as fast as you can + // Default: 0 + uint64_t backup_rate_limit; + + // Max bytes that can be transferred in a second during restore. + // If 0, go as fast as you can + // Default: 0 + uint64_t restore_rate_limit; + void Dump(Logger* logger) const; explicit BackupableDBOptions(const std::string& _backup_dir, @@ -68,14 +78,18 @@ struct BackupableDBOptions { bool _share_table_files = true, Logger* _info_log = nullptr, bool _sync = true, bool _destroy_old_data = false, - bool _backup_log_files = true) + bool _backup_log_files = true, + uint64_t _backup_rate_limit = 0, + uint64_t _restore_rate_limit = 0) : backup_dir(_backup_dir), backup_env(_backup_env), share_table_files(_share_table_files), info_log(_info_log), sync(_sync), destroy_old_data(_destroy_old_data), - backup_log_files(_backup_log_files) {} + backup_log_files(_backup_log_files), + backup_rate_limit(_backup_rate_limit), + restore_rate_limit(_restore_rate_limit) {} }; struct RestoreOptions { diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index a784275330..32c3b84816 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -26,17 +26,60 @@ namespace rocksdb { +namespace { +class RateLimiter { + public: + RateLimiter(Env* env, uint64_t max_bytes_per_second, uint64_t bytes_per_check) + : env_(env), + max_bytes_per_second_(max_bytes_per_second), + bytes_per_check_(bytes_per_check), + micros_start_time_(env->NowMicros()), + bytes_since_start_(0) {} + + void ReportAndWait(uint64_t bytes_since_last_call) { + bytes_since_start_ += bytes_since_last_call; + if (bytes_since_start_ < bytes_per_check_) { + // not enough bytes to be rate-limited + return; + } + + uint64_t now = env_->NowMicros(); + uint64_t interval = now - micros_start_time_; + uint64_t should_take_micros = + (bytes_since_start_ * kMicrosInSecond) / max_bytes_per_second_; + + if (should_take_micros > interval) { + env_->SleepForMicroseconds(should_take_micros - interval); + now = env_->NowMicros(); + } + // reset interval + micros_start_time_ = now; + bytes_since_start_ = 0; + } + + private: + Env* env_; + uint64_t max_bytes_per_second_; + uint64_t bytes_per_check_; + uint64_t micros_start_time_; + uint64_t bytes_since_start_; + static const uint64_t kMicrosInSecond = 1000 * 1000LL; +}; +} // namespace + void BackupableDBOptions::Dump(Logger* logger) const { - Log(logger, " Options.backup_dir: %s", backup_dir.c_str()); - Log(logger, " Options.backup_env: %p", backup_env); - Log(logger, "Options.share_table_files: %d", + Log(logger, " Options.backup_dir: %s", backup_dir.c_str()); + Log(logger, " Options.backup_env: %p", backup_env); + Log(logger, " Options.share_table_files: %d", static_cast(share_table_files)); - Log(logger, " Options.info_log: %p", info_log); - Log(logger, " Options.sync: %d", static_cast(sync)); - Log(logger, " Options.destroy_old_data: %d", + Log(logger, " Options.info_log: %p", info_log); + Log(logger, " Options.sync: %d", static_cast(sync)); + Log(logger, " Options.destroy_old_data: %d", static_cast(destroy_old_data)); - Log(logger, " Options.backup_log_files: %d", + Log(logger, " Options.backup_log_files: %d", static_cast(backup_log_files)); + Log(logger, " Options.backup_rate_limit: %" PRIu64, backup_rate_limit); + Log(logger, "Options.restore_rate_limit: %" PRIu64, restore_rate_limit); } // -------- BackupEngineImpl class --------- @@ -170,6 +213,7 @@ class BackupEngineImpl : public BackupEngine { Env* src_env, Env* dst_env, bool sync, + RateLimiter* rate_limiter, uint64_t* size = nullptr, uint32_t* checksum_value = nullptr, uint64_t size_limit = 0); @@ -179,6 +223,7 @@ class BackupEngineImpl : public BackupEngine { bool shared, const std::string& src_dir, const std::string& src_fname, // starts with "/" + RateLimiter* rate_limiter, uint64_t size_limit = 0); Status CalculateChecksum(const std::string& src, @@ -209,7 +254,8 @@ class BackupEngineImpl : public BackupEngine { unique_ptr meta_directory_; unique_ptr private_directory_; - static const size_t copy_file_buffer_size_ = 5 * 1024 * 1024LL; // 5MB + static const size_t kDefaultCopyFileBufferSize = 5 * 1024 * 1024LL; // 5MB + size_t copy_file_buffer_size_; }; BackupEngine* BackupEngine::NewBackupEngine( @@ -222,9 +268,8 @@ BackupEngineImpl::BackupEngineImpl(Env* db_env, : stop_backup_(false), options_(options), db_env_(db_env), - backup_env_(options.backup_env != nullptr ? options.backup_env - : db_env_) { - + backup_env_(options.backup_env != nullptr ? options.backup_env : db_env_), + copy_file_buffer_size_(kDefaultCopyFileBufferSize) { options_.Dump(options_.info_log); // create all the dirs we need @@ -350,6 +395,13 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) { s = backup_env_->CreateDir( GetAbsolutePath(GetPrivateFileRel(new_backup_id, true))); + unique_ptr rate_limiter; + if (options_.backup_rate_limit > 0) { + copy_file_buffer_size_ = options_.backup_rate_limit / 10; + rate_limiter.reset(new RateLimiter(db_env_, options_.backup_rate_limit, + copy_file_buffer_size_)); + } + // copy live_files for (size_t i = 0; s.ok() && i < live_files.size(); ++i) { uint64_t number; @@ -371,6 +423,7 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) { options_.share_table_files && type == kTableFile, db->GetName(), /* src_dir */ live_files[i], /* src_fname */ + rate_limiter.get(), (type == kDescriptorFile) ? manifest_file_size : 0); } @@ -383,7 +436,8 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) { &new_backup, false, /* not shared */ db->GetOptions().wal_dir, - live_wal_files[i]->PathName()); + live_wal_files[i]->PathName(), + rate_limiter.get()); } } @@ -527,6 +581,12 @@ Status BackupEngineImpl::RestoreDBFromBackup( DeleteChildren(db_dir); } + unique_ptr rate_limiter; + if (options_.restore_rate_limit > 0) { + copy_file_buffer_size_ = options_.restore_rate_limit / 10; + rate_limiter.reset(new RateLimiter(db_env_, options_.restore_rate_limit, + copy_file_buffer_size_)); + } Status s; for (auto& file : backup.GetFiles()) { std::string dst; @@ -551,7 +611,7 @@ Status BackupEngineImpl::RestoreDBFromBackup( Log(options_.info_log, "Restoring %s to %s\n", file.c_str(), dst.c_str()); uint32_t checksum_value; s = CopyFile(GetAbsolutePath(file), dst, backup_env_, db_env_, false, - nullptr /* size */, &checksum_value); + rate_limiter.get(), nullptr /* size */, &checksum_value); if (!s.ok()) { break; } @@ -631,7 +691,8 @@ Status BackupEngineImpl::PutLatestBackupFileContents(uint32_t latest_backup) { Status BackupEngineImpl::CopyFile(const std::string& src, const std::string& dst, Env* src_env, - Env* dst_env, bool sync, uint64_t* size, + Env* dst_env, bool sync, + RateLimiter* rate_limiter, uint64_t* size, uint32_t* checksum_value, uint64_t size_limit) { Status s; @@ -684,6 +745,9 @@ Status BackupEngineImpl::CopyFile(const std::string& src, data.size()); } s = dst_file->Append(data); + if (rate_limiter != nullptr) { + rate_limiter->ReportAndWait(data.size()); + } } while (s.ok() && data.size() > 0 && size_limit > 0); if (s.ok() && sync) { @@ -697,6 +761,7 @@ Status BackupEngineImpl::CopyFile(const std::string& src, Status BackupEngineImpl::BackupFile(BackupID backup_id, BackupMeta* backup, bool shared, const std::string& src_dir, const std::string& src_fname, + RateLimiter* rate_limiter, uint64_t size_limit) { assert(src_fname.size() > 0 && src_fname[0] == '/'); @@ -732,6 +797,7 @@ Status BackupEngineImpl::BackupFile(BackupID backup_id, BackupMeta* backup, db_env_, backup_env_, options_.sync, + rate_limiter, &size, &checksum_value, size_limit); diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index c5ee445a6c..88c4af0d36 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -293,13 +293,16 @@ class FileManager : public EnvWrapper { }; // FileManager // utility functions -static void FillDB(DB* db, int from, int to) { +static size_t FillDB(DB* db, int from, int to) { + size_t bytes_written = 0; for (int i = from; i < to; ++i) { std::string key = "testkey" + std::to_string(i); std::string value = "testvalue" + std::to_string(i); + bytes_written += key.size() + value.size(); ASSERT_OK(db->Put(WriteOptions(), Slice(key), Slice(value))); } + return bytes_written; } static void AssertExists(DB* db, int from, int to) { @@ -780,8 +783,8 @@ TEST(BackupableDBTest, DeleteTmpFiles) { } TEST(BackupableDBTest, KeepLogFiles) { - // basically infinite backupable_options_->backup_log_files = false; + // basically infinite options_.WAL_ttl_seconds = 24 * 60 * 60; OpenBackupableDB(true); FillDB(db_.get(), 0, 100); @@ -800,6 +803,47 @@ TEST(BackupableDBTest, KeepLogFiles) { AssertBackupConsistency(0, 0, 500, 600, true); } +TEST(BackupableDBTest, RateLimiting) { + uint64_t const KB = 1024 * 1024; + size_t const kMicrosPerSec = 1000 * 1000LL; + + std::vector> limits( + {{KB, 5 * KB}, {2 * KB, 3 * KB}}); + + for (const auto& limit : limits) { + // destroy old data + DestroyDB(dbname_, Options()); + + backupable_options_->backup_rate_limit = limit.first; + backupable_options_->restore_rate_limit = limit.second; + options_.compression = kNoCompression; + OpenBackupableDB(true); + size_t bytes_written = FillDB(db_.get(), 0, 100000); + + auto start_backup = env_->NowMicros(); + ASSERT_OK(db_->CreateNewBackup(false)); + auto backup_time = env_->NowMicros() - start_backup; + auto rate_limited_backup_time = (bytes_written * kMicrosPerSec) / + backupable_options_->backup_rate_limit; + ASSERT_GT(backup_time, 0.9 * rate_limited_backup_time); + ASSERT_LT(backup_time, 1.1 * rate_limited_backup_time); + + CloseBackupableDB(); + + OpenRestoreDB(); + auto start_restore = env_->NowMicros(); + ASSERT_OK(restore_db_->RestoreDBFromLatestBackup(dbname_, dbname_)); + auto restore_time = env_->NowMicros() - start_restore; + CloseRestoreDB(); + auto rate_limited_restore_time = (bytes_written * kMicrosPerSec) / + backupable_options_->restore_rate_limit; + ASSERT_GT(restore_time, 0.9 * rate_limited_restore_time); + ASSERT_LT(restore_time, 1.1 * rate_limited_restore_time); + + AssertBackupConsistency(0, 0, 100000, 100010); + } +} + } // anon namespace } // namespace rocksdb