Merge branch 'master' into performance

Conflicts:
	db/db_impl.cc
	db/db_impl.h
This commit is contained in:
Dhruba Borthakur
2012-10-21 01:55:19 -07:00
2 changed files with 96 additions and 21 deletions

View File

@@ -86,6 +86,22 @@ struct DBImpl::CompactionState {
}
};
struct DBImpl::DeletionState {
// the set of all live files that cannot be deleted
std::set<uint64_t> live;
// a list of all siles that exists in the db directory
std::vector<std::string> allfiles;
// the current filenumber, lognumber and prevlognumber
// that corresponds to the set of files in 'live'.
uint64_t filenumber, lognumber, prevlognumber;
// the list of all files to be evicted from the table cahce
std::vector<uint64_t> files_to_evict;
};
// Fix user-supplied options to be reasonable
template <class T,class V>
static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
@@ -249,7 +265,11 @@ void DBImpl::MaybeIgnoreError(Status* s) const {
}
}
void DBImpl::DeleteObsoleteFiles() {
// Returns the list of live files in 'live' and the list
// of all files in the filesystem in 'allfiles'.
void DBImpl::FindObsoleteFiles(DeletionState& deletion_state) {
mutex_.AssertHeld();
// if deletion is disabled, do nothing
if (disable_delete_obsolete_files_) {
return;
@@ -268,39 +288,51 @@ void DBImpl::DeleteObsoleteFiles() {
}
// Make a set of all of the live files
std::set<uint64_t> live = pending_outputs_;
versions_->AddLiveFiles(&live);
deletion_state.live = pending_outputs_;
versions_->AddLiveFiles(&deletion_state.live);
std::vector<std::string> filenames;
env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose
// set of all files in the directory
env_->GetChildren(dbname_, &deletion_state.allfiles); // Ignore errors
// store the current filenum, lognum, etc
deletion_state.filenumber = versions_->ManifestFileNumber();
deletion_state.lognumber = versions_->LogNumber();
deletion_state.prevlognumber = versions_->PrevLogNumber();
}
// Diffs the files listed in filenames and those that do not
// belong to live files are posibly removed. If the removed file
// is a sst file, then it returns the file number in files_to_evict.
// It is not necesary to hold the mutex when invoking this method.
void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
uint64_t number;
FileType type;
std::vector<std::string> old_log_files;
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type)) {
for (size_t i = 0; i < state.allfiles.size(); i++) {
if (ParseFileName(state.allfiles[i], &number, &type)) {
bool keep = true;
switch (type) {
case kLogFile:
keep = ((number >= versions_->LogNumber()) ||
(number == versions_->PrevLogNumber()));
keep = ((number >= state.lognumber) ||
(number == state.prevlognumber));
break;
case kDescriptorFile:
// Keep my manifest file, and any newer incarnations'
// (in case there is a race that allows other incarnations)
keep = (number >= versions_->ManifestFileNumber());
keep = (number >= state.filenumber);
break;
case kTableFile:
keep = (live.find(number) != live.end());
keep = (state.live.find(number) != state.live.end());
break;
case kTempFile:
// Any temp files that are currently being written to must
// be recorded in pending_outputs_, which is inserted into "live"
keep = (live.find(number) != live.end());
keep = (state.live.find(number) != state.live.end());
break;
case kInfoLogFile:
keep = true;
if (number != 0) {
old_log_files.push_back(filenames[i]);
old_log_files.push_back(state.allfiles[i]);
}
break;
case kCurrentFile:
@@ -311,12 +343,13 @@ void DBImpl::DeleteObsoleteFiles() {
if (!keep) {
if (type == kTableFile) {
table_cache_->Evict(number);
// record the files to be evicted from the cache
state.files_to_evict.push_back(number);
}
Log(options_.info_log, "Delete type=%d #%lld\n",
int(type),
static_cast<unsigned long long>(number));
Status st = env_->DeleteFile(dbname_ + "/" + filenames[i]);
Status st = env_->DeleteFile(dbname_ + "/" + state.allfiles[i]);
if(!st.ok()) {
Log(options_.info_log, "Delete type=%d #%lld FAILED\n",
int(type),
@@ -333,13 +366,32 @@ void DBImpl::DeleteObsoleteFiles() {
std::sort(old_log_files.begin(), old_log_files.end());
for (int i = 0; i >= (old_log_file_count - KEEP_LOG_FILE_NUM); i++) {
std::string& to_delete = old_log_files.at(i);
Log(options_.info_log, "Delete type=%d %s\n",
int(kInfoLogFile), to_delete.c_str());
// Log(options_.info_log, "Delete type=%d %s\n",
// int(kInfoLogFile), to_delete.c_str());
env_->DeleteFile(dbname_ + "/" + to_delete);
}
}
}
void DBImpl::EvictObsoleteFiles(DeletionState& state) {
mutex_.AssertHeld();
for (unsigned int i = 0; i < state.files_to_evict.size(); i++) {
table_cache_->Evict(state.files_to_evict[i]);
}
}
void DBImpl::DeleteObsoleteFiles() {
mutex_.AssertHeld();
DeletionState deletion_state;
std::set<uint64_t> live;
std::vector<std::string> allfiles;
std::vector<uint64_t> files_to_evict;
uint64_t filenumber, lognumber, prevlognumber;
FindObsoleteFiles(deletion_state);
PurgeObsoleteFiles(deletion_state);
EvictObsoleteFiles(deletion_state);
}
Status DBImpl::Recover(VersionEdit* edit) {
mutex_.AssertHeld();
@@ -828,11 +880,12 @@ void DBImpl::BGWork(void* db) {
void DBImpl::BackgroundCall() {
bool madeProgress;
DeletionState deletion_state;
MutexLock l(&mutex_);
// Log(options_.info_log, "XXX BG Thread %llx process new work item", pthread_self());
assert(bg_compaction_scheduled_);
if (!shutting_down_.Acquire_Load()) {
Status s = BackgroundCompaction(&madeProgress);
Status s = BackgroundCompaction(&madeProgress, deletion_state);
if (!s.ok()) {
// Wait a little bit before retrying background compaction in
// case this is an environmental problem and we do not want to
@@ -847,6 +900,14 @@ void DBImpl::BackgroundCall() {
}
}
// delete unnecessary files if any, this is done outside the mutex
if (!deletion_state.live.empty()) {
mutex_.Unlock();
PurgeObsoleteFiles(deletion_state);
mutex_.Lock();
}
EvictObsoleteFiles(deletion_state);
bg_compaction_scheduled_--;
MaybeScheduleLogDBDeployStats();
@@ -860,7 +921,8 @@ void DBImpl::BackgroundCall() {
bg_cv_.SignalAll();
}
Status DBImpl::BackgroundCompaction(bool* madeProgress) {
Status DBImpl::BackgroundCompaction(bool* madeProgress,
DeletionState& deletion_state) {
*madeProgress = false;
mutex_.AssertHeld();
@@ -924,7 +986,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress) {
CleanupCompaction(compact);
versions_->ReleaseCompactionFiles(c);
c->ReleaseInputs();
DeleteObsoleteFiles();
FindObsoleteFiles(deletion_state);
*madeProgress = true;
}
delete c;