@20602303. Default file permission is now 755.

git-svn-id: https://leveldb.googlecode.com/svn/trunk@20 62dab493-f737-651d-591e-8d6aee1b9529
This commit is contained in:
dgrogan@chromium.org
2011-04-12 19:38:58 +00:00
parent 9e33808a26
commit f779e7a5d8
117 changed files with 629 additions and 276 deletions

272
db/db_impl.cc Normal file → Executable file
View File

@@ -104,6 +104,9 @@ Options SanitizeOptions(const std::string& dbname,
result.info_log = new NullWritableFile;
}
}
if (result.block_cache == NULL) {
result.block_cache = NewLRUCache(8 << 20);
}
return result;
}
@@ -112,18 +115,20 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
internal_comparator_(options.comparator),
options_(SanitizeOptions(dbname, &internal_comparator_, options)),
owns_info_log_(options_.info_log != options.info_log),
owns_cache_(options_.block_cache != options.block_cache),
dbname_(dbname),
db_lock_(NULL),
shutting_down_(NULL),
bg_cv_(&mutex_),
compacting_cv_(&mutex_),
last_sequence_(0),
mem_(new MemTable(internal_comparator_)),
imm_(NULL),
logfile_(NULL),
log_(NULL),
log_number_(0),
bg_compaction_scheduled_(false),
compacting_(false) {
has_imm_.Release_Store(NULL);
// Reserve ten files or so for other uses and give the rest to TableCache.
const int table_cache_size = options.max_open_files - 10;
table_cache_ = new TableCache(dbname_, &options_, table_cache_size);
@@ -149,6 +154,7 @@ DBImpl::~DBImpl() {
delete versions_;
delete mem_;
delete imm_;
delete log_;
delete logfile_;
delete table_cache_;
@@ -156,15 +162,15 @@ DBImpl::~DBImpl() {
if (owns_info_log_) {
delete options_.info_log;
}
if (owns_cache_) {
delete options_.block_cache;
}
}
Status DBImpl::NewDB() {
assert(log_number_ == 0);
assert(last_sequence_ == 0);
VersionEdit new_db;
new_db.SetComparatorName(user_comparator()->Name());
new_db.SetLogNumber(log_number_);
new_db.SetLogNumber(0);
new_db.SetNextFile(2);
new_db.SetLastSequence(0);
@@ -193,15 +199,6 @@ Status DBImpl::NewDB() {
return s;
}
Status DBImpl::Install(VersionEdit* edit,
uint64_t new_log_number,
MemTable* cleanup_mem) {
mutex_.AssertHeld();
edit->SetLogNumber(new_log_number);
edit->SetLastSequence(last_sequence_);
return versions_->LogAndApply(edit, cleanup_mem);
}
void DBImpl::MaybeIgnoreError(Status* s) const {
if (s->ok() || options_.paranoid_checks) {
// No change needed
@@ -216,7 +213,7 @@ void DBImpl::DeleteObsoleteFiles() {
std::set<uint64_t> live = pending_outputs_;
versions_->AddLiveFiles(&live);
versions_->CleanupLargeValueRefs(live, log_number_);
versions_->CleanupLargeValueRefs(live);
std::vector<std::string> filenames;
env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose
@@ -228,7 +225,8 @@ void DBImpl::DeleteObsoleteFiles() {
bool keep = true;
switch (type) {
case kLogFile:
keep = (number == log_number_);
keep = ((number == versions_->LogNumber()) ||
(number == versions_->PrevLogNumber()));
break;
case kDescriptorFile:
// Keep my manifest file, and any newer incarnations'
@@ -296,16 +294,20 @@ Status DBImpl::Recover(VersionEdit* edit) {
}
}
s = versions_->Recover(&log_number_, &last_sequence_);
s = versions_->Recover();
if (s.ok()) {
// Recover from the log file named in the descriptor
// Recover from the log files named in the descriptor
SequenceNumber max_sequence(0);
if (log_number_ != 0) { // log_number_ == 0 indicates initial empty state
s = RecoverLogFile(log_number_, edit, &max_sequence);
if (versions_->PrevLogNumber() != 0) { // log#==0 means no prev log
s = RecoverLogFile(versions_->PrevLogNumber(), edit, &max_sequence);
}
if (s.ok() && versions_->LogNumber() != 0) { // log#==0 for initial state
s = RecoverLogFile(versions_->LogNumber(), edit, &max_sequence);
}
if (s.ok()) {
last_sequence_ =
last_sequence_ > max_sequence ? last_sequence_ : max_sequence;
if (versions_->LastSequence() < max_sequence) {
versions_->SetLastSequence(max_sequence);
}
}
}
@@ -407,56 +409,58 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit) {
mutex_.AssertHeld();
const uint64_t start_micros = env_->NowMicros();
FileMetaData meta;
meta.number = versions_->NewFileNumber();
pending_outputs_.insert(meta.number);
Iterator* iter = mem->NewIterator();
Log(env_, options_.info_log, "Level-0 table #%llu: started",
(unsigned long long) meta.number);
Status s = BuildTable(dbname_, env_, options_, table_cache_,
iter, &meta, edit);
Status s;
{
mutex_.Unlock();
s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta, edit);
mutex_.Lock();
}
Log(env_, options_.info_log, "Level-0 table #%llu: %lld bytes %s",
(unsigned long long) meta.number,
(unsigned long long) meta.file_size,
s.ToString().c_str());
delete iter;
pending_outputs_.erase(meta.number);
CompactionStats stats;
stats.micros = env_->NowMicros() - start_micros;
stats.bytes_written = meta.file_size;
stats_[0].Add(stats);
return s;
}
Status DBImpl::CompactMemTable() {
mutex_.AssertHeld();
WritableFile* lfile = NULL;
uint64_t new_log_number = versions_->NewFileNumber();
VersionEdit edit;
assert(imm_ != NULL);
assert(compacting_);
// Save the contents of the memtable as a new Table
Status s = WriteLevel0Table(mem_, &edit);
if (s.ok()) {
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
}
VersionEdit edit;
Status s = WriteLevel0Table(imm_, &edit);
// Save a new descriptor with the new table and log number.
// Replace immutable memtable with the generated Table
if (s.ok()) {
s = Install(&edit, new_log_number, mem_);
edit.SetPrevLogNumber(0);
s = versions_->LogAndApply(&edit, imm_);
}
if (s.ok()) {
// Commit to the new state
mem_ = new MemTable(internal_comparator_);
delete log_;
delete logfile_;
logfile_ = lfile;
log_ = new log::Writer(lfile);
log_number_ = new_log_number;
imm_ = NULL;
has_imm_.Release_Store(NULL);
DeleteObsoleteFiles();
MaybeScheduleCompaction();
} else {
delete lfile;
env_->DeleteFile(LogFileName(dbname_, new_log_number));
}
compacting_cv_.SignalAll(); // Wake up waiter even if there was an error
return s;
}
@@ -485,7 +489,17 @@ void DBImpl::TEST_CompactRange(
Status DBImpl::TEST_CompactMemTable() {
MutexLock l(&mutex_);
return CompactMemTable();
Status s = MakeRoomForWrite(true /* force compaction */);
if (s.ok()) {
// Wait until the compaction completes
while (imm_ != NULL && bg_error_.ok()) {
compacting_cv_.Wait();
}
if (imm_ != NULL) {
s = bg_error_;
}
}
return s;
}
void DBImpl::MaybeScheduleCompaction() {
@@ -496,7 +510,7 @@ void DBImpl::MaybeScheduleCompaction() {
// Some other thread is running a compaction. Do not conflict with it.
} else if (shutting_down_.Acquire_Load()) {
// DB is being deleted; no more background compactions
} else if (!versions_->NeedsCompaction()) {
} else if (imm_ == NULL && !versions_->NeedsCompaction()) {
// No work to be done
} else {
bg_compaction_scheduled_ = true;
@@ -525,6 +539,16 @@ void DBImpl::BackgroundCall() {
void DBImpl::BackgroundCompaction() {
mutex_.AssertHeld();
assert(!compacting_);
if (imm_ != NULL) {
compacting_ = true;
CompactMemTable();
compacting_ = false;
compacting_cv_.SignalAll();
return;
}
Compaction* c = versions_->PickCompaction();
if (c == NULL) {
// Nothing to do
@@ -539,7 +563,7 @@ void DBImpl::BackgroundCompaction() {
c->edit()->DeleteFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
f->smallest, f->largest);
status = Install(c->edit(), log_number_, NULL);
status = versions_->LogAndApply(c->edit(), NULL);
Log(env_, options_.info_log, "Moved #%lld to level-%d %lld bytes %s\n",
static_cast<unsigned long long>(f->number),
c->level() + 1,
@@ -680,7 +704,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
}
compact->outputs.clear();
Status s = Install(compact->compaction->edit(), log_number_, NULL);
Status s = versions_->LogAndApply(compact->compaction->edit(), NULL);
if (s.ok()) {
compact->compaction->ReleaseInputs();
DeleteObsoleteFiles();
@@ -694,6 +718,9 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
}
Status DBImpl::DoCompactionWork(CompactionState* compact) {
const uint64_t start_micros = env_->NowMicros();
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
Log(env_, options_.info_log, "Compacting %d@%d + %d@%d files",
compact->compaction->num_input_files(0),
compact->compaction->level(),
@@ -704,7 +731,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
assert(compact->builder == NULL);
assert(compact->outfile == NULL);
if (snapshots_.empty()) {
compact->smallest_snapshot = last_sequence_;
compact->smallest_snapshot = versions_->LastSequence();
} else {
compact->smallest_snapshot = snapshots_.oldest()->number_;
}
@@ -721,6 +748,18 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
bool has_current_user_key = false;
SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
// Prioritize immutable compaction work
if (has_imm_.NoBarrier_Load() != NULL) {
const uint64_t imm_start = env_->NowMicros();
mutex_.Lock();
if (imm_ != NULL) {
CompactMemTable();
compacting_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
}
mutex_.Unlock();
imm_micros += (env_->NowMicros() - imm_start);
}
Slice key = input->key();
InternalKey tmp_internal_key;
tmp_internal_key.DecodeFrom(key);
@@ -835,7 +874,19 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
delete input;
input = NULL;
CompactionStats stats;
stats.micros = env_->NowMicros() - start_micros - imm_micros;
for (int which = 0; which < 2; which++) {
for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
stats.bytes_read += compact->compaction->input(which, i)->file_size;
}
}
for (int i = 0; i < compact->outputs.size(); i++) {
stats.bytes_written += compact->outputs[i].file_size;
}
mutex_.Lock();
stats_[compact->compaction->level() + 1].Add(stats);
if (status.ok()) {
status = InstallCompactionResults(compact);
@@ -848,11 +899,14 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
SequenceNumber* latest_snapshot) {
mutex_.Lock();
*latest_snapshot = last_sequence_;
*latest_snapshot = versions_->LastSequence();
// Collect together all needed child iterators
std::vector<Iterator*> list;
list.push_back(mem_->NewIterator());
if (imm_ != NULL) {
list.push_back(imm_->NewIterator());
}
versions_->current()->AddIterators(options, &list);
Iterator* internal_iter =
NewMergingIterator(&internal_comparator_, &list[0], list.size());
@@ -912,7 +966,7 @@ void DBImpl::Unref(void* arg1, void* arg2) {
const Snapshot* DBImpl::GetSnapshot() {
MutexLock l(&mutex_);
return snapshots_.New(last_sequence_);
return snapshots_.New(versions_->LastSequence());
}
void DBImpl::ReleaseSnapshot(const Snapshot* s) {
@@ -935,17 +989,16 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
WriteBatch* final = NULL;
{
MutexLock l(&mutex_);
if (!bg_error_.ok()) {
status = bg_error_;
} else if (mem_->ApproximateMemoryUsage() > options_.write_buffer_size) {
status = CompactMemTable();
status = MakeRoomForWrite(false); // May temporarily release lock and wait
uint64_t last_sequence = versions_->LastSequence();
if (status.ok()) {
status = HandleLargeValues(last_sequence + 1, updates, &final);
}
if (status.ok()) {
status = HandleLargeValues(last_sequence_ + 1, updates, &final);
}
if (status.ok()) {
WriteBatchInternal::SetSequence(final, last_sequence_ + 1);
last_sequence_ += WriteBatchInternal::Count(final);
WriteBatchInternal::SetSequence(final, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(final);
versions_->SetLastSequence(last_sequence);
// Add to log and apply to memtable
status = log_->AddRecord(WriteBatchInternal::Contents(final));
@@ -959,7 +1012,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
if (options.post_write_snapshot != NULL) {
*options.post_write_snapshot =
status.ok() ? snapshots_.New(last_sequence_) : NULL;
status.ok() ? snapshots_.New(last_sequence) : NULL;
}
}
if (final != updates) {
@@ -969,6 +1022,54 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
return status;
}
Status DBImpl::MakeRoomForWrite(bool force) {
mutex_.AssertHeld();
Status s;
while (true) {
if (!bg_error_.ok()) {
// Yield previous error
s = bg_error_;
break;
} else if (!force &&
(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
// There is room in current memtable
break;
} else if (imm_ != NULL) {
// We have filled up the current memtable, but the previous
// one is still being compacted, so we wait.
compacting_cv_.Wait();
} else {
// Attempt to switch to a new memtable and trigger compaction of old
assert(versions_->PrevLogNumber() == 0);
uint64_t new_log_number = versions_->NewFileNumber();
WritableFile* lfile = NULL;
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
if (!s.ok()) {
break;
}
VersionEdit edit;
edit.SetPrevLogNumber(versions_->LogNumber());
edit.SetLogNumber(new_log_number);
s = versions_->LogAndApply(&edit, NULL);
if (!s.ok()) {
delete lfile;
env_->DeleteFile(LogFileName(dbname_, new_log_number));
break;
}
delete log_;
delete logfile_;
logfile_ = lfile;
log_ = new log::Writer(lfile);
imm_ = mem_;
has_imm_.Release_Store(imm_);
mem_ = new MemTable(internal_comparator_);
force = false; // Do not force another compaction if have room
MaybeScheduleCompaction();
}
}
return s;
}
bool DBImpl::HasLargeValues(const WriteBatch& batch) const {
if (WriteBatchInternal::ByteSize(&batch) >= options_.large_value_threshold) {
for (WriteBatchInternal::Iterator it(batch); !it.Done(); it.Next()) {
@@ -1033,9 +1134,10 @@ Status DBImpl::HandleLargeValues(SequenceNumber assigned_seq,
MaybeCompressLargeValue(
it.value(), &file_bytes, &scratch, &large_ref);
InternalKey ikey(it.key(), seq, kTypeLargeValueRef);
if (versions_->RegisterLargeValueRef(large_ref, log_number_,ikey)) {
if (versions_->RegisterLargeValueRef(
large_ref, versions_->LogNumber(), ikey)) {
// TODO(opt): avoid holding the lock here (but be careful about
// another thread doing a Write and changing log_number_ or
// another thread doing a Write and switching logs or
// having us get a different "assigned_seq" value).
uint64_t tmp_number = versions_->NewFileNumber();
@@ -1086,7 +1188,9 @@ Status DBImpl::HandleLargeValues(SequenceNumber assigned_seq,
return Status::OK();
}
bool DBImpl::GetProperty(const Slice& property, uint64_t* value) {
bool DBImpl::GetProperty(const Slice& property, std::string* value) {
value->clear();
MutexLock l(&mutex_);
Slice in = property;
Slice prefix("leveldb.");
@@ -1100,10 +1204,37 @@ bool DBImpl::GetProperty(const Slice& property, uint64_t* value) {
if (!ok || level < 0 || level >= config::kNumLevels) {
return false;
} else {
*value = versions_->NumLevelFiles(level);
char buf[100];
snprintf(buf, sizeof(buf), "%d", versions_->NumLevelFiles(level));
*value = buf;
return true;
}
} else if (in == "stats") {
char buf[200];
snprintf(buf, sizeof(buf),
" Compactions\n"
"Level Files Size(MB) Time(sec) Read(MB) Write(MB)\n"
"--------------------------------------------------\n"
);
value->append(buf);
for (int level = 0; level < config::kNumLevels; level++) {
int files = versions_->NumLevelFiles(level);
if (stats_[level].micros > 0 || files > 0) {
snprintf(
buf, sizeof(buf),
"%3d %8d %8.0f %9.0f %8.0f %9.0f\n",
level,
files,
versions_->NumLevelBytes(level) / 1048576.0,
stats_[level].micros / 1e6,
stats_[level].bytes_read / 1048576.0,
stats_[level].bytes_written / 1048576.0);
value->append(buf);
}
}
return true;
}
return false;
}
@@ -1158,14 +1289,15 @@ Status DB::Open(const Options& options, const std::string& dbname,
VersionEdit edit;
Status s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists
if (s.ok()) {
impl->log_number_ = impl->versions_->NewFileNumber();
uint64_t new_log_number = impl->versions_->NewFileNumber();
WritableFile* lfile;
s = options.env->NewWritableFile(LogFileName(dbname, impl->log_number_),
s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
&lfile);
if (s.ok()) {
edit.SetLogNumber(new_log_number);
impl->logfile_ = lfile;
impl->log_ = new log::Writer(lfile);
s = impl->Install(&edit, impl->log_number_, NULL);
s = impl->versions_->LogAndApply(&edit, NULL);
}
if (s.ok()) {
impl->DeleteObsoleteFiles();