[CF] Dont reuse dropped column family IDs

Summary:
Column family IDs should be unique, even if column family is dropped. To achieve this, we save max column family in manifest.

Note that the diff is still not ready. I'm only using differential to move the patch to my Mac machine.

Test Plan: added a test to column_family_test

Reviewers: dhruba, haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D16581
This commit is contained in:
Igor Canadi
2014-03-05 12:13:44 -08:00
parent e21d5b8bbc
commit 9625acbf70
7 changed files with 94 additions and 10 deletions

View File

@@ -1497,6 +1497,9 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
return Status::OK();
}
if (edit->is_column_family_drop_) {
// if we drop column family, we have to make sure to save max column family,
// so that we don't reuse existing ID
edit->SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily());
column_family_data->SetDropped();
}
@@ -1789,6 +1792,7 @@ Status VersionSet::Recover(
uint64_t last_sequence = 0;
uint64_t log_number = 0;
uint64_t prev_log_number = 0;
uint32_t max_column_family = 0;
std::unordered_map<uint32_t, Builder*> builders;
// add default column family
@@ -1918,6 +1922,10 @@ Status VersionSet::Recover(
have_next_file = true;
}
if (edit.has_max_column_family_) {
max_column_family = edit.max_column_family_;
}
if (edit.has_last_sequence_) {
last_sequence = edit.last_sequence_;
have_last_sequence = true;
@@ -1938,6 +1946,8 @@ Status VersionSet::Recover(
prev_log_number = 0;
}
column_family_set_->UpdateMaxColumnFamily(max_column_family);
MarkFileNumberUsed(prev_log_number);
MarkFileNumberUsed(log_number);
}
@@ -1981,13 +1991,15 @@ Status VersionSet::Recover(
Log(options_->info_log, "Recovered from manifest file:%s succeeded,"
"manifest_file_number is %lu, next_file_number is %lu, "
"last_sequence is %lu, log_number is %lu,"
"prev_log_number is %lu\n",
"prev_log_number is %lu,"
"max_column_family is %u\n",
manifest_filename.c_str(),
(unsigned long)manifest_file_number_,
(unsigned long)next_file_number_,
(unsigned long)last_sequence_,
(unsigned long)log_number,
(unsigned long)prev_log_number_);
(unsigned long)prev_log_number_,
column_family_set_->GetMaxColumnFamily());
for (auto cfd : *column_family_set_) {
Log(options_->info_log,
@@ -2267,6 +2279,10 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
last_sequence = edit.last_sequence_;
have_last_sequence = true;
}
if (edit.has_max_column_family_) {
column_family_set_->UpdateMaxColumnFamily(edit.max_column_family_);
}
}
}
file.reset();
@@ -2315,9 +2331,10 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
printf(
"manifest_file_number %lu next_file_number %lu last_sequence "
"%lu prev_log_number %lu\n",
"%lu prev_log_number %lu max_column_family %u\n",
(unsigned long)manifest_file_number_, (unsigned long)next_file_number_,
(unsigned long)last_sequence, (unsigned long)prev_log_number);
(unsigned long)last_sequence, (unsigned long)prev_log_number,
column_family_set_->GetMaxColumnFamily());
}
return s;
@@ -2378,6 +2395,18 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
}
}
// save max column family to avoid reusing the same column
// family ID for two different column families
if (column_family_set_->GetMaxColumnFamily() > 0) {
VersionEdit edit;
edit.SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily());
std::string record;
edit.EncodeTo(&record);
Status s = log->AddRecord(record);
if (!s.ok()) {
return s;
}
}
return Status::OK();
}