mirror of
https://github.com/XRPLF/rippled.git
synced 2025-12-06 17:27:55 +00:00
[RocksDB] Clear Archive WAL files
Summary: WAL files are moved to archive directory and clear only at DB::Open. Can lead to a lot of space consumption in a Database. Added logic to periodically clear Archive Directory too. Test Plan: make all check + add unit test Reviewers: dhruba, heyongqiang Reviewed By: heyongqiang CC: leveldb Differential Revision: https://reviews.facebook.net/D10617
This commit is contained in:
@@ -159,6 +159,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
|
||||
logger_(nullptr),
|
||||
disable_delete_obsolete_files_(false),
|
||||
delete_obsolete_files_last_run_(0),
|
||||
purge_wal_files_last_run_(0),
|
||||
stall_level0_slowdown_(0),
|
||||
stall_memtable_compaction_(0),
|
||||
stall_level0_num_files_(0),
|
||||
@@ -390,16 +391,19 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
|
||||
// record the files to be evicted from the cache
|
||||
state.files_to_evict.push_back(number);
|
||||
}
|
||||
Log(options_.info_log, "Delete type=%d #%lld\n",
|
||||
int(type),
|
||||
static_cast<unsigned long long>(number));
|
||||
Log(options_.info_log, "Delete type=%d #%lu", int(type), number);
|
||||
if (type == kLogFile && options_.WAL_ttl_seconds > 0) {
|
||||
Status st = env_->RenameFile(LogFileName(dbname_, number),
|
||||
ArchivedLogFileName(dbname_, number));
|
||||
Status st = env_->RenameFile(
|
||||
LogFileName(dbname_, number),
|
||||
ArchivedLogFileName(dbname_, number)
|
||||
);
|
||||
|
||||
if (!st.ok()) {
|
||||
Log(options_.info_log, "RenameFile type=%d #%lld FAILED\n",
|
||||
Log(
|
||||
options_.info_log, "RenameFile type=%d #%lu FAILED",
|
||||
int(type),
|
||||
static_cast<unsigned long long>(number));
|
||||
number
|
||||
);
|
||||
}
|
||||
} else {
|
||||
Status st = env_->DeleteFile(dbname_ + "/" + state.allfiles[i]);
|
||||
@@ -428,6 +432,7 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
|
||||
env_->DeleteFile(dbname_ + "/" + to_delete);
|
||||
}
|
||||
}
|
||||
PurgeObsoleteWALFiles();
|
||||
}
|
||||
|
||||
void DBImpl::EvictObsoleteFiles(DeletionState& state) {
|
||||
@@ -442,34 +447,36 @@ void DBImpl::DeleteObsoleteFiles() {
|
||||
FindObsoleteFiles(deletion_state);
|
||||
PurgeObsoleteFiles(deletion_state);
|
||||
EvictObsoleteFiles(deletion_state);
|
||||
PurgeObsoleteWALFiles();
|
||||
}
|
||||
|
||||
void DBImpl::PurgeObsoleteWALFiles() {
|
||||
int64_t current_time;
|
||||
Status s = env_->GetCurrentTime(¤t_time);
|
||||
uint64_t now_micros = static_cast<uint64_t>(current_time);
|
||||
assert(s.ok());
|
||||
|
||||
if (options_.WAL_ttl_seconds != ULONG_MAX && options_.WAL_ttl_seconds > 0) {
|
||||
std::vector<std::string> WALFiles;
|
||||
std::string archivalDir = ArchivalDirectory(dbname_);
|
||||
env_->GetChildren(archivalDir, &WALFiles);
|
||||
int64_t currentTime;
|
||||
const Status status = env_->GetCurrentTime(¤tTime);
|
||||
assert(status.ok());
|
||||
for (const auto& f : WALFiles) {
|
||||
uint64_t fileMTime;
|
||||
const std::string filePath = archivalDir + "/" + f;
|
||||
const Status s = env_->GetFileModificationTime(filePath, &fileMTime);
|
||||
if (s.ok()) {
|
||||
if (status.ok() &&
|
||||
(currentTime - fileMTime > options_.WAL_ttl_seconds)) {
|
||||
Status delStatus = env_->DeleteFile(filePath);
|
||||
if (!delStatus.ok()) {
|
||||
Log(options_.info_log,
|
||||
"Failed Deleting a WAL file Error : i%s",
|
||||
delStatus.ToString().c_str());
|
||||
}
|
||||
if (purge_wal_files_last_run_ + options_.WAL_ttl_seconds > now_micros) {
|
||||
return;
|
||||
}
|
||||
std::vector<std::string> wal_files;
|
||||
std::string archival_dir = ArchivalDirectory(dbname_);
|
||||
env_->GetChildren(archival_dir, &wal_files);
|
||||
for (const auto& f : wal_files) {
|
||||
uint64_t file_m_time;
|
||||
const std::string file_path = archival_dir + "/" + f;
|
||||
const Status s = env_->GetFileModificationTime(file_path, &file_m_time);
|
||||
if (s.ok() && (now_micros - file_m_time > options_.WAL_ttl_seconds)) {
|
||||
Status status = env_->DeleteFile(file_path);
|
||||
if (!status.ok()) {
|
||||
Log(options_.info_log,
|
||||
"Failed Deleting a WAL file Error : i%s",
|
||||
status.ToString().c_str());
|
||||
}
|
||||
} // Ignore errors.
|
||||
}
|
||||
}
|
||||
purge_wal_files_last_run_ = now_micros;
|
||||
}
|
||||
|
||||
// If externalTable is set, then apply recovered transactions
|
||||
@@ -1197,6 +1204,10 @@ void DBImpl::BGWork(void* db) {
|
||||
reinterpret_cast<DBImpl*>(db)->BackgroundCall();
|
||||
}
|
||||
|
||||
void DBImpl::TEST_PurgeObsoleteteWAL() {
|
||||
PurgeObsoleteWALFiles();
|
||||
}
|
||||
|
||||
void DBImpl::BackgroundCall() {
|
||||
bool madeProgress = false;
|
||||
DeletionState deletion_state;
|
||||
@@ -1225,6 +1236,7 @@ void DBImpl::BackgroundCall() {
|
||||
PurgeObsoleteFiles(deletion_state);
|
||||
EvictObsoleteFiles(deletion_state);
|
||||
mutex_.Lock();
|
||||
|
||||
}
|
||||
|
||||
bg_compaction_scheduled_--;
|
||||
|
||||
@@ -89,6 +89,10 @@ class DBImpl : public DB {
|
||||
|
||||
// Return the current manifest file no.
|
||||
uint64_t TEST_Current_Manifest_FileNo();
|
||||
|
||||
// Trigger's a background call for testing.
|
||||
void TEST_PurgeObsoleteteWAL();
|
||||
|
||||
protected:
|
||||
Env* const env_;
|
||||
const std::string dbname_;
|
||||
@@ -265,6 +269,9 @@ class DBImpl : public DB {
|
||||
// last time when DeleteObsoleteFiles was invoked
|
||||
uint64_t delete_obsolete_files_last_run_;
|
||||
|
||||
// last time when PurgeObsoleteWALFiles ran.
|
||||
uint64_t purge_wal_files_last_run_;
|
||||
|
||||
// These count the number of microseconds for which MakeRoomForWrite stalls.
|
||||
uint64_t stall_level0_slowdown_;
|
||||
uint64_t stall_memtable_compaction_;
|
||||
|
||||
@@ -2566,20 +2566,20 @@ TEST(DBTest, CompactOnFlush) {
|
||||
db_->ReleaseSnapshot(snapshot1);
|
||||
}
|
||||
|
||||
void ListLogFiles(Env* env,
|
||||
const std::string& path,
|
||||
std::vector<uint64_t>* logFiles) {
|
||||
std::vector<std::uint64_t> ListLogFiles(Env* env, const std::string& path) {
|
||||
std::vector<std::string> files;
|
||||
std::vector<uint64_t> log_files;
|
||||
env->GetChildren(path, &files);
|
||||
uint64_t number;
|
||||
FileType type;
|
||||
for (size_t i = 0; i < files.size(); ++i) {
|
||||
if (ParseFileName(files[i], &number, &type)) {
|
||||
if (type == kLogFile) {
|
||||
logFiles->push_back(number);
|
||||
log_files.push_back(number);
|
||||
}
|
||||
}
|
||||
}
|
||||
return std::move(log_files);
|
||||
}
|
||||
|
||||
TEST(DBTest, WALArchival) {
|
||||
@@ -2598,42 +2598,52 @@ TEST(DBTest, WALArchival) {
|
||||
std::string archiveDir = ArchivalDirectory(dbname_);
|
||||
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
|
||||
for (int j = 0; j < 10; ++j) {
|
||||
ASSERT_OK(Put(Key(10*i+j), value));
|
||||
}
|
||||
|
||||
std::vector<uint64_t> logFiles;
|
||||
ListLogFiles(env_, dbname_, &logFiles);
|
||||
std::vector<uint64_t> logFiles = ListLogFiles(env_, dbname_);
|
||||
|
||||
options.create_if_missing = false;
|
||||
Reopen(&options);
|
||||
|
||||
std::vector<uint64_t> logs;
|
||||
ListLogFiles(env_, archiveDir, &logs);
|
||||
std::vector<uint64_t> logs = ListLogFiles(env_, archiveDir);
|
||||
std::set<uint64_t> archivedFiles(logs.begin(), logs.end());
|
||||
|
||||
for (std::vector<uint64_t>::iterator it = logFiles.begin();
|
||||
it != logFiles.end();
|
||||
++it) {
|
||||
ASSERT_TRUE(archivedFiles.find(*it) != archivedFiles.end());
|
||||
for (auto& log : logFiles) {
|
||||
ASSERT_TRUE(archivedFiles.find(log) != archivedFiles.end());
|
||||
}
|
||||
}
|
||||
|
||||
// REOPEN database with 0 TTL. all files should have been deleted.
|
||||
std::vector<uint64_t> logFiles;
|
||||
ListLogFiles(env_, archiveDir, &logFiles);
|
||||
std::vector<uint64_t> logFiles = ListLogFiles(env_, archiveDir);
|
||||
ASSERT_TRUE(logFiles.size() > 0);
|
||||
options.WAL_ttl_seconds = 1;
|
||||
env_->SleepForMicroseconds(2*1000*1000);
|
||||
Reopen(&options);
|
||||
|
||||
logFiles.clear();
|
||||
ListLogFiles(env_, archiveDir, &logFiles);
|
||||
logFiles = ListLogFiles(env_, archiveDir);
|
||||
ASSERT_TRUE(logFiles.size() == 0);
|
||||
|
||||
}
|
||||
|
||||
TEST(DBTest, WALClear) {
|
||||
Options options = CurrentOptions();
|
||||
options.create_if_missing = true;
|
||||
options.WAL_ttl_seconds = 1;
|
||||
|
||||
for (int j = 0; j < 10; ++j)
|
||||
for (int i = 0; i < 10; ++i)
|
||||
ASSERT_OK(Put(Key(10*i+j), DummyString(1024)));
|
||||
Reopen(&options);
|
||||
std::string archive_dir = ArchivalDirectory(dbname_);
|
||||
std::vector<std::uint64_t> log_files = ListLogFiles(env_, archive_dir);
|
||||
ASSERT_TRUE(!log_files.empty());
|
||||
env_->SleepForMicroseconds(2 * 1000 * 1000);
|
||||
dbfull()->TEST_PurgeObsoleteteWAL();
|
||||
log_files = ListLogFiles(env_, archive_dir);
|
||||
ASSERT_TRUE(log_files.empty());
|
||||
}
|
||||
|
||||
void ExpectRecords(
|
||||
const int expected_no_records,
|
||||
std::unique_ptr<TransactionLogIterator>& iter) {
|
||||
@@ -2662,7 +2672,7 @@ TEST(DBTest, TransactionLogIterator) {
|
||||
ExpectRecords(3, iter);
|
||||
}
|
||||
Reopen(&options);
|
||||
{
|
||||
env_->SleepForMicroseconds(2 * 1000 * 1000);{
|
||||
Put("key4", DummyString(1024));
|
||||
Put("key5", DummyString(1024));
|
||||
Put("key6", DummyString(1024));
|
||||
|
||||
Reference in New Issue
Block a user