Files
xahaud/db/db_impl.cc
Vinnie Falco 596a35acca Squashed 'src/hyperleveldb/' changes from ac2ae30..25511b7
25511b7 Merge branch 'master' of github.com:rescrv/HyperLevelDB into hyperdb
ed01020 Make "source" universal
3784d92 Ignore the static file
507319b Don't package with snappy
3e2cc8b Tolerate -fno-rtti
4dcdd6e Drop revision down to 1.0.dev
2542163 Drop all but the latest kept for garbage reasons
9c270b7 Update .gitignore
5331878 Add upack script
adc2a7a Explicitly add -lpthread for Ubuntu
7b57bbd Strip NULL chars passed to LiveBackup
e3b87e7 Add write-buffer-size option to benchmark
2f11087 Followup to snappy support with -DSNAPPY
af503da Improve efficiency of ReplayIterator; fix a bug
33c1f0c Add snappy support
ce1cacf Fix a race in ReplayIterator
5c4679b Fix a bug in the replay_iterator
ca332bd Fix sort algorithm used for compaction boundaries.
d9ec544 make checK
b83a9cd Fix a deadlock in the ReplayIterator dtor
273547b Fix a double-delete in ReplayIterator
3377c7a Add "all" to set of special timestamps
387f43a Timestamp comparison and validation.
f9a6eb1 make distcheck
9a4d0b7 Add a ReplayIterator.
1d53869 Conditionally enable read-driven compaction.
f6fa561 16% end-to-end performance improvement from the skiplist
28ffd32 Merge remote-tracking branch 'upstream/master'
a58de73 Revert "Remove read-driven compactions."
e19fc0c Fix upstream issue 200
748539c LevelDB 1.13
78b7812 Add install instructions to README
e47a48e Make benchmark dir variable
820a096 Update distributed files.
486ca7f Live backup of LevelDB instances
6579884 Put a reference counter on log_/logfile_
3075253 Update internal benchmark.
2a6b0bd Make the Version a parameter of PickCompaction
5bd76dc Release leveldb 1.12

git-subtree-dir: src/hyperleveldb
git-subtree-split: 25511b7a9101b0bafb57349d2194ba80ccbf7bc3
2013-11-19 11:32:55 -08:00

1922 lines
57 KiB
C++

// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db_impl.h"
#include <algorithm>
#include <iostream>
#include <set>
#include <string>
#include <stdint.h>
#include <stdio.h>
#include <vector>
#include "builder.h"
#include "db_iter.h"
#include "dbformat.h"
#include "filename.h"
#include "log_reader.h"
#include "log_writer.h"
#include "memtable.h"
#include "replay_iterator.h"
#include "table_cache.h"
#include "version_set.h"
#include "write_batch_internal.h"
#include "../hyperleveldb/db.h"
#include "../hyperleveldb/env.h"
#include "../hyperleveldb/replay_iterator.h"
#include "../hyperleveldb/status.h"
#include "../hyperleveldb/table.h"
#include "../hyperleveldb/table_builder.h"
#include "../port/port.h"
#include "../table/block.h"
#include "../table/merger.h"
#include "../table/two_level_iterator.h"
#include "../util/coding.h"
#include "../util/logging.h"
#include "../util/mutexlock.h"
namespace hyperleveldb {
const int kStraightReads = 50;
const int kNumNonTableCacheFiles = 10;
struct DBImpl::CompactionState {
Compaction* const compaction;
// Sequence numbers < smallest_snapshot are not significant since we
// will never have to service a snapshot below smallest_snapshot.
// Therefore if we have seen a sequence number S <= smallest_snapshot,
// we can drop all entries for the same key with sequence numbers < S.
SequenceNumber smallest_snapshot;
// Files produced by compaction
struct Output {
uint64_t number;
uint64_t file_size;
InternalKey smallest, largest;
};
std::vector<Output> outputs;
// State kept for output being generated
WritableFile* outfile;
TableBuilder* builder;
uint64_t total_bytes;
Output* current_output() { return &outputs[outputs.size()-1]; }
explicit CompactionState(Compaction* c)
: compaction(c),
outfile(NULL),
builder(NULL),
total_bytes(0) {
}
};
// Fix user-supplied options to be reasonable
template <class T,class V>
static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
}
Options SanitizeOptions(const std::string& dbname,
const InternalKeyComparator* icmp,
const InternalFilterPolicy* ipolicy,
const Options& src) {
Options result = src;
result.comparator = icmp;
result.filter_policy = (src.filter_policy != NULL) ? ipolicy : NULL;
ClipToRange(&result.max_open_files, 64 + kNumNonTableCacheFiles, 50000);
ClipToRange(&result.write_buffer_size, 64<<10, 1<<30);
ClipToRange(&result.block_size, 1<<10, 4<<20);
if (result.info_log == NULL) {
// Open a log file in the same directory as the db
src.env->CreateDir(dbname); // In case it does not exist
src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname));
Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log);
if (!s.ok()) {
// No place suitable for logging
result.info_log = NULL;
}
}
if (result.block_cache == NULL) {
result.block_cache = NewLRUCache(8 << 20);
}
return result;
}
DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
: env_(raw_options.env),
internal_comparator_(raw_options.comparator),
internal_filter_policy_(raw_options.filter_policy),
options_(SanitizeOptions(dbname, &internal_comparator_,
&internal_filter_policy_, raw_options)),
owns_info_log_(options_.info_log != raw_options.info_log),
owns_cache_(options_.block_cache != raw_options.block_cache),
dbname_(dbname),
db_lock_(NULL),
shutting_down_(NULL),
mem_(new MemTable(internal_comparator_)),
imm_(NULL),
logfile_(),
logfile_number_(0),
log_(),
seed_(0),
writers_lower_(0),
writers_upper_(0),
bg_fg_cv_(&mutex_),
allow_background_activity_(false),
num_bg_threads_(0),
bg_compaction_cv_(&mutex_),
bg_memtable_cv_(&mutex_),
bg_optimistic_trip_(false),
bg_optimistic_cv_(&mutex_),
bg_log_cv_(&mutex_),
bg_log_occupied_(false),
manual_compaction_(NULL),
manual_garbage_cutoff_(raw_options.manual_garbage_collection ?
SequenceNumber(0) : kMaxSequenceNumber),
straight_reads_(0),
backup_cv_(&mutex_),
backup_in_progress_(),
backup_deferred_delete_(),
consecutive_compaction_errors_(0) {
mutex_.Lock();
mem_->Ref();
has_imm_.Release_Store(NULL);
backup_in_progress_.Release_Store(NULL);
env_->StartThread(&DBImpl::CompactMemTableWrapper, this);
env_->StartThread(&DBImpl::CompactOptimisticWrapper, this);
env_->StartThread(&DBImpl::CompactLevelWrapper, this);
num_bg_threads_ = 3;
// Reserve ten files or so for other uses and give the rest to TableCache.
const int table_cache_size = options_.max_open_files - kNumNonTableCacheFiles;
table_cache_ = new TableCache(dbname_, &options_, table_cache_size);
versions_ = new VersionSet(dbname_, &options_, table_cache_,
&internal_comparator_);
for (int i = 0; i < config::kNumLevels; ++i) {
levels_locked_[i] = false;
}
mutex_.Unlock();
}
DBImpl::~DBImpl() {
// Wait for background work to finish
mutex_.Lock();
shutting_down_.Release_Store(this); // Any non-NULL value is ok
bg_optimistic_cv_.SignalAll();
bg_compaction_cv_.SignalAll();
bg_memtable_cv_.SignalAll();
while (num_bg_threads_ > 0) {
bg_fg_cv_.Wait();
}
mutex_.Unlock();
if (db_lock_ != NULL) {
env_->UnlockFile(db_lock_);
}
delete versions_;
if (mem_ != NULL) mem_->Unref();
if (imm_ != NULL) imm_->Unref();
log_.reset();
logfile_.reset();
delete table_cache_;
if (owns_info_log_) {
delete options_.info_log;
}
if (owns_cache_) {
delete options_.block_cache;
}
}
Status DBImpl::NewDB() {
VersionEdit new_db;
new_db.SetComparatorName(user_comparator()->Name());
new_db.SetLogNumber(0);
new_db.SetNextFile(2);
new_db.SetLastSequence(0);
const std::string manifest = DescriptorFileName(dbname_, 1);
WritableFile* file;
Status s = env_->NewWritableFile(manifest, &file);
if (!s.ok()) {
return s;
}
{
log::Writer log(file);
std::string record;
new_db.EncodeTo(&record);
s = log.AddRecord(record);
if (s.ok()) {
s = file->Close();
}
}
delete file;
if (s.ok()) {
// Make "CURRENT" file that points to the new manifest file.
s = SetCurrentFile(env_, dbname_, 1);
} else {
env_->DeleteFile(manifest);
}
return s;
}
void DBImpl::MaybeIgnoreError(Status* s) const {
if (s->ok() || options_.paranoid_checks) {
// No change needed
} else {
Log(options_.info_log, "Ignoring error %s", s->ToString().c_str());
*s = Status::OK();
}
}
void DBImpl::DeleteObsoleteFiles() {
// Defer if there's background activity
mutex_.AssertHeld();
if (backup_in_progress_.Acquire_Load() != NULL) {
backup_deferred_delete_ = true;
return;
}
// If you ever release mutex_ in this function, you'll need to do more work in
// LiveBackup
// Make a set of all of the live files
std::set<uint64_t> live = pending_outputs_;
versions_->AddLiveFiles(&live);
std::vector<std::string> filenames;
env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose
uint64_t number;
FileType type;
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type)) {
bool keep = true;
switch (type) {
case kLogFile:
keep = ((number >= versions_->LogNumber()) ||
(number == versions_->PrevLogNumber()));
break;
case kDescriptorFile:
// Keep my manifest file, and any newer incarnations'
// (in case there is a race that allows other incarnations)
keep = (number >= versions_->ManifestFileNumber());
break;
case kTableFile:
keep = (live.find(number) != live.end());
break;
case kTempFile:
// Any temp files that are currently being written to must
// be recorded in pending_outputs_, which is inserted into "live"
keep = (live.find(number) != live.end());
break;
case kCurrentFile:
case kDBLockFile:
case kInfoLogFile:
keep = true;
break;
}
if (!keep) {
if (type == kTableFile) {
table_cache_->Evict(number);
}
Log(options_.info_log, "Delete type=%d #%lld\n",
int(type),
static_cast<unsigned long long>(number));
env_->DeleteFile(dbname_ + "/" + filenames[i]);
}
}
}
}
Status DBImpl::Recover(VersionEdit* edit) {
mutex_.AssertHeld();
// Ignore error from CreateDir since the creation of the DB is
// committed only when the descriptor is created, and this directory
// may already exist from a previous failed creation attempt.
env_->CreateDir(dbname_);
assert(db_lock_ == NULL);
Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
if (!s.ok()) {
return s;
}
if (!env_->FileExists(CurrentFileName(dbname_))) {
if (options_.create_if_missing) {
s = NewDB();
if (!s.ok()) {
return s;
}
} else {
return Status::InvalidArgument(
dbname_, "does not exist (create_if_missing is false)");
}
} else {
if (options_.error_if_exists) {
return Status::InvalidArgument(
dbname_, "exists (error_if_exists is true)");
}
}
s = versions_->Recover();
if (s.ok()) {
SequenceNumber max_sequence(0);
// Recover from all newer log files than the ones named in the
// descriptor (new log files may have been added by the previous
// incarnation without registering them in the descriptor).
//
// Note that PrevLogNumber() is no longer used, but we pay
// attention to it in case we are recovering a database
// produced by an older version of leveldb.
const uint64_t min_log = versions_->LogNumber();
const uint64_t prev_log = versions_->PrevLogNumber();
std::vector<std::string> filenames;
s = env_->GetChildren(dbname_, &filenames);
if (!s.ok()) {
return s;
}
std::set<uint64_t> expected;
versions_->AddLiveFiles(&expected);
uint64_t number;
FileType type;
std::vector<uint64_t> logs;
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type)) {
expected.erase(number);
if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
logs.push_back(number);
}
}
if (!expected.empty()) {
char buf[50];
snprintf(buf, sizeof(buf), "%d missing files; e.g.",
static_cast<int>(expected.size()));
return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
}
// Recover in the order in which the logs were generated
std::sort(logs.begin(), logs.end());
for (size_t i = 0; i < logs.size(); i++) {
s = RecoverLogFile(logs[i], edit, &max_sequence);
// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(logs[i]);
}
if (s.ok()) {
if (versions_->LastSequence() < max_sequence) {
versions_->SetLastSequence(max_sequence);
}
}
}
return s;
}
Status DBImpl::RecoverLogFile(uint64_t log_number,
VersionEdit* edit,
SequenceNumber* max_sequence) {
struct LogReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
const char* fname;
Status* status; // NULL if options_.paranoid_checks==false
virtual void Corruption(size_t bytes, const Status& s) {
Log(info_log, "%s%s: dropping %d bytes; %s",
(this->status == NULL ? "(ignoring error) " : ""),
fname, static_cast<int>(bytes), s.ToString().c_str());
if (this->status != NULL && this->status->ok()) *this->status = s;
}
};
mutex_.AssertHeld();
// Open the log file
std::string fname = LogFileName(dbname_, log_number);
SequentialFile* file;
Status status = env_->NewSequentialFile(fname, &file);
if (!status.ok()) {
MaybeIgnoreError(&status);
return status;
}
// Create the log reader.
LogReporter reporter;
reporter.env = env_;
reporter.info_log = options_.info_log;
reporter.fname = fname.c_str();
reporter.status = (options_.paranoid_checks ? &status : NULL);
// We intentially make log::Reader do checksumming even if
// paranoid_checks==false so that corruptions cause entire commits
// to be skipped instead of propagating bad information (like overly
// large sequence numbers).
log::Reader reader(file, &reporter, true/*checksum*/,
0/*initial_offset*/);
Log(options_.info_log, "Recovering log #%llu",
(unsigned long long) log_number);
// Read all the records and add to a memtable
std::string scratch;
Slice record;
WriteBatch batch;
MemTable* mem = NULL;
while (reader.ReadRecord(&record, &scratch) &&
status.ok()) {
if (record.size() < 12) {
reporter.Corruption(
record.size(), Status::Corruption("log record too small"));
continue;
}
WriteBatchInternal::SetContents(&batch, record);
if (mem == NULL) {
mem = new MemTable(internal_comparator_);
mem->Ref();
}
status = WriteBatchInternal::InsertInto(&batch, mem);
MaybeIgnoreError(&status);
if (!status.ok()) {
break;
}
const SequenceNumber last_seq =
WriteBatchInternal::Sequence(&batch) +
WriteBatchInternal::Count(&batch) - 1;
if (last_seq > *max_sequence) {
*max_sequence = last_seq;
}
if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
status = WriteLevel0Table(mem, edit, NULL, NULL);
if (!status.ok()) {
// Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail.
break;
}
mem->Unref();
mem = NULL;
}
}
if (status.ok() && mem != NULL) {
status = WriteLevel0Table(mem, edit, NULL, NULL);
// Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail.
}
if (mem != NULL) mem->Unref();
delete file;
return status;
}
Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
Version* base, uint64_t* number) {
mutex_.AssertHeld();
const uint64_t start_micros = env_->NowMicros();
FileMetaData meta;
meta.number = versions_->NewFileNumber();
if (number) {
*number = meta.number;
}
pending_outputs_.insert(meta.number);
Iterator* iter = mem->NewIterator();
Log(options_.info_log, "Level-0 table #%llu: started",
(unsigned long long) meta.number);
Status s;
{
mutex_.Unlock();
s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
mutex_.Lock();
}
Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s",
(unsigned long long) meta.number,
(unsigned long long) meta.file_size,
s.ToString().c_str());
delete iter;
// Note that if file_size is zero, the file has been deleted and
// should not be added to the manifest.
int level = 0;
if (s.ok() && meta.file_size > 0) {
const Slice min_user_key = meta.smallest.user_key();
const Slice max_user_key = meta.largest.user_key();
if (base != NULL) {
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
while (level > 0 && levels_locked_[level]) {
--level;
}
}
edit->AddFile(level, meta.number, meta.file_size,
meta.smallest, meta.largest);
}
CompactionStats stats;
stats.micros = env_->NowMicros() - start_micros;
stats.bytes_written = meta.file_size;
stats_[level].Add(stats);
return s;
}
void DBImpl::CompactMemTableThread() {
MutexLock l(&mutex_);
while (!shutting_down_.Acquire_Load() && !allow_background_activity_) {
bg_memtable_cv_.Wait();
}
while (!shutting_down_.Acquire_Load()) {
while (!shutting_down_.Acquire_Load() && imm_ == NULL) {
bg_memtable_cv_.Wait();
}
if (shutting_down_.Acquire_Load()) {
break;
}
// Save the contents of the memtable as a new Table
VersionEdit edit;
Version* base = versions_->current();
base->Ref();
uint64_t number;
Status s = WriteLevel0Table(imm_, &edit, base, &number);
base->Unref(); base = NULL;
if (s.ok() && shutting_down_.Acquire_Load()) {
s = Status::IOError("Deleting DB during memtable compaction");
}
// Replace immutable memtable with the generated Table
if (s.ok()) {
edit.SetPrevLogNumber(0);
edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed
s = versions_->LogAndApply(&edit, &mutex_, &bg_log_cv_, &bg_log_occupied_);
}
pending_outputs_.erase(number);
if (s.ok()) {
// Commit to the new state
imm_->Unref();
imm_ = NULL;
has_imm_.Release_Store(NULL);
bg_fg_cv_.SignalAll();
bg_compaction_cv_.Signal();
DeleteObsoleteFiles();
}
if (!shutting_down_.Acquire_Load() && !s.ok()) {
// Wait a little bit before retrying background compaction in
// case this is an environmental problem and we do not want to
// chew up resources for failed compactions for the duration of
// the problem.
bg_fg_cv_.SignalAll(); // In case a waiter can proceed despite the error
Log(options_.info_log, "Waiting after memtable compaction error: %s",
s.ToString().c_str());
mutex_.Unlock();
env_->SleepForMicroseconds(1000000);
mutex_.Lock();
}
assert(config::kL0_SlowdownWritesTrigger > 0);
if (versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger - 1) {
bg_optimistic_trip_ = true;
bg_optimistic_cv_.Signal();
}
}
Log(options_.info_log, "cleaning up CompactMemTableThread");
num_bg_threads_ -= 1;
bg_fg_cv_.SignalAll();
}
void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
int max_level_with_files = 1;
{
MutexLock l(&mutex_);
Version* base = versions_->current();
for (int level = 1; level < config::kNumLevels; level++) {
if (base->OverlapInLevel(level, begin, end)) {
max_level_with_files = level;
}
}
}
TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
for (int level = 0; level < max_level_with_files; level++) {
TEST_CompactRange(level, begin, end);
}
}
void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
assert(level >= 0);
assert(level + 1 < config::kNumLevels);
InternalKey begin_storage, end_storage;
ManualCompaction manual;
manual.level = level;
manual.done = false;
if (begin == NULL) {
manual.begin = NULL;
} else {
begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
manual.begin = &begin_storage;
}
if (end == NULL) {
manual.end = NULL;
} else {
end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
manual.end = &end_storage;
}
MutexLock l(&mutex_);
while (!manual.done) {
while (manual_compaction_ != NULL) {
bg_fg_cv_.Wait();
}
manual_compaction_ = &manual;
bg_compaction_cv_.Signal();
bg_memtable_cv_.Signal();
while (manual_compaction_ == &manual) {
bg_fg_cv_.Wait();
}
}
}
Status DBImpl::TEST_CompactMemTable() {
// NULL batch means just wait for earlier writes to be done
Status s = Write(WriteOptions(), NULL);
if (s.ok()) {
// Wait until the compaction completes
MutexLock l(&mutex_);
while (imm_ != NULL && bg_error_.ok()) {
bg_fg_cv_.Wait();
}
if (imm_ != NULL) {
s = bg_error_;
}
}
return s;
}
void DBImpl::CompactLevelThread() {
MutexLock l(&mutex_);
while (!shutting_down_.Acquire_Load() && !allow_background_activity_) {
bg_compaction_cv_.Wait();
}
while (!shutting_down_.Acquire_Load()) {
while (!shutting_down_.Acquire_Load() &&
manual_compaction_ == NULL &&
!versions_->NeedsCompaction(levels_locked_, straight_reads_ > kStraightReads)) {
bg_compaction_cv_.Wait();
}
if (shutting_down_.Acquire_Load()) {
break;
}
assert(manual_compaction_ == NULL || num_bg_threads_ == 3);
Status s = BackgroundCompaction();
bg_fg_cv_.SignalAll(); // before the backoff In case a waiter
// can proceed despite the error
if (s.ok()) {
// Success
consecutive_compaction_errors_ = 0;
} else if (shutting_down_.Acquire_Load()) {
// Error most likely due to shutdown; do not wait
} else {
// Wait a little bit before retrying background compaction in
// case this is an environmental problem and we do not want to
// chew up resources for failed compactions for the duration of
// the problem.
Log(options_.info_log, "Waiting after background compaction error: %s",
s.ToString().c_str());
mutex_.Unlock();
++consecutive_compaction_errors_;
int seconds_to_sleep = 1;
for (int i = 0; i < 3 && i < consecutive_compaction_errors_ - 1; ++i) {
seconds_to_sleep *= 2;
}
env_->SleepForMicroseconds(seconds_to_sleep * 1000000);
mutex_.Lock();
}
}
Log(options_.info_log, "cleaning up CompactLevelThread");
num_bg_threads_ -= 1;
bg_fg_cv_.SignalAll();
}
Status DBImpl::BackgroundCompaction() {
mutex_.AssertHeld();
Compaction* c = NULL;
bool is_manual = (manual_compaction_ != NULL);
InternalKey manual_end;
if (is_manual) {
ManualCompaction* m = manual_compaction_;
c = versions_->CompactRange(m->level, m->begin, m->end);
m->done = (c == NULL);
if (c != NULL) {
manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
}
Log(options_.info_log,
"Manual compaction at level-%d from %s .. %s; will stop at %s\n",
m->level,
(m->begin ? m->begin->DebugString().c_str() : "(begin)"),
(m->end ? m->end->DebugString().c_str() : "(end)"),
(m->done ? "(end)" : manual_end.DebugString().c_str()));
} else {
int level = versions_->PickCompactionLevel(levels_locked_, straight_reads_ > kStraightReads);
if (level != config::kNumLevels) {
c = versions_->PickCompaction(versions_->current(), level);
}
if (c) {
assert(!levels_locked_[c->level() + 0]);
assert(!levels_locked_[c->level() + 1]);
levels_locked_[c->level() + 0] = true;
levels_locked_[c->level() + 1] = true;
}
}
Status status;
if (c == NULL) {
// Nothing to do
} else if (!is_manual && c->IsTrivialMove() && c->level() > 0) {
// Move file to next level
for (size_t i = 0; i < c->num_input_files(0); ++i) {
FileMetaData* f = c->input(0, i);
c->edit()->DeleteFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
f->smallest, f->largest);
}
status = versions_->LogAndApply(c->edit(), &mutex_, &bg_log_cv_, &bg_log_occupied_);
VersionSet::LevelSummaryStorage tmp;
for (size_t i = 0; i < c->num_input_files(0); ++i) {
FileMetaData* f = c->input(0, i);
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
static_cast<unsigned long long>(f->number),
c->level() + 1,
static_cast<unsigned long long>(f->file_size),
status.ToString().c_str(),
versions_->LevelSummary(&tmp));
}
} else {
CompactionState* compact = new CompactionState(c);
status = DoCompactionWork(compact);
CleanupCompaction(compact);
c->ReleaseInputs();
DeleteObsoleteFiles();
}
if (c) {
levels_locked_[c->level() + 0] = false;
levels_locked_[c->level() + 1] = false;
delete c;
}
if (status.ok()) {
// Done
} else if (shutting_down_.Acquire_Load()) {
// Ignore compaction errors found during shutting down
} else {
Log(options_.info_log,
"Compaction error: %s", status.ToString().c_str());
if (options_.paranoid_checks && bg_error_.ok()) {
bg_error_ = status;
}
}
if (is_manual) {
ManualCompaction* m = manual_compaction_;
if (!status.ok()) {
m->done = true;
}
if (!m->done) {
// We only compacted part of the requested range. Update *m
// to the range that is left to be compacted.
m->tmp_storage = manual_end;
m->begin = &m->tmp_storage;
}
manual_compaction_ = NULL;
}
return status;
}
void DBImpl::CompactOptimisticThread() {
MutexLock l(&mutex_);
while (!shutting_down_.Acquire_Load() && !allow_background_activity_) {
bg_optimistic_cv_.Wait();
}
while (!shutting_down_.Acquire_Load()) {
while (!shutting_down_.Acquire_Load() && !bg_optimistic_trip_) {
bg_optimistic_cv_.Wait();
}
if (shutting_down_.Acquire_Load()) {
break;
}
bg_optimistic_trip_ = false;
Status s = OptimisticCompaction();
if (!shutting_down_.Acquire_Load() && !s.ok()) {
// Wait a little bit before retrying background compaction in
// case this is an environmental problem and we do not want to
// chew up resources for failed compactions for the duration of
// the problem.
Log(options_.info_log, "Waiting after optimistic compaction error: %s",
s.ToString().c_str());
mutex_.Unlock();
env_->SleepForMicroseconds(1000000);
mutex_.Lock();
}
}
Log(options_.info_log, "cleaning up OptimisticCompactThread");
num_bg_threads_ -= 1;
bg_fg_cv_.SignalAll();
}
Status DBImpl::OptimisticCompaction() {
mutex_.AssertHeld();
Log(options_.info_log, "Optimistic compaction started");
bool did_compaction = true;
uint64_t iters = 0;
while (did_compaction) {
++iters;
did_compaction = false;
Compaction* c = NULL;
for (size_t level = 1; level + 1 < config::kNumLevels; ++level) {
if (levels_locked_[level] || levels_locked_[level + 1]) {
continue;
}
Compaction* tmp = versions_->PickCompaction(versions_->current(), level);
if (tmp && tmp->IsTrivialMove()) {
if (c) {
delete c;
}
c = tmp;
break;
} else if (c && tmp && c->ratio() < tmp->ratio()) {
delete c;
c = tmp;
} else if (!c) {
c = tmp;
} else {
delete tmp;
}
}
if (!c) {
continue;
}
if (!c->IsTrivialMove() && c->ratio() < .90) {
delete c;
continue;
}
assert(!levels_locked_[c->level() + 0]);
assert(!levels_locked_[c->level() + 1]);
levels_locked_[c->level() + 0] = true;
levels_locked_[c->level() + 1] = true;
did_compaction = true;
Status status;
if (c->IsTrivialMove() && c->level() > 0) {
// Move file to next level
for (size_t i = 0; i < c->num_input_files(0); ++i) {
FileMetaData* f = c->input(0, i);
c->edit()->DeleteFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
f->smallest, f->largest);
}
status = versions_->LogAndApply(c->edit(), &mutex_, &bg_log_cv_, &bg_log_occupied_);
VersionSet::LevelSummaryStorage tmp;
for (size_t i = 0; i < c->num_input_files(0); ++i) {
FileMetaData* f = c->input(0, i);
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
static_cast<unsigned long long>(f->number),
c->level() + 1,
static_cast<unsigned long long>(f->file_size),
status.ToString().c_str(),
versions_->LevelSummary(&tmp));
}
} else {
CompactionState* compact = new CompactionState(c);
status = DoCompactionWork(compact);
CleanupCompaction(compact);
c->ReleaseInputs();
DeleteObsoleteFiles();
}
levels_locked_[c->level() + 0] = false;
levels_locked_[c->level() + 1] = false;
delete c;
if (status.ok()) {
// Done
} else if (shutting_down_.Acquire_Load()) {
// Ignore compaction errors found during shutting down
break;
} else {
Log(options_.info_log,
"Compaction error: %s", status.ToString().c_str());
if (options_.paranoid_checks && bg_error_.ok()) {
bg_error_ = status;
}
break;
}
}
Log(options_.info_log, "Optimistic compaction ended after %ld iterations", iters);
return Status::OK();
}
void DBImpl::CleanupCompaction(CompactionState* compact) {
mutex_.AssertHeld();
if (compact->builder != NULL) {
// May happen if we get a shutdown call in the middle of compaction
compact->builder->Abandon();
delete compact->builder;
} else {
assert(compact->outfile == NULL);
}
delete compact->outfile;
for (size_t i = 0; i < compact->outputs.size(); i++) {
const CompactionState::Output& out = compact->outputs[i];
pending_outputs_.erase(out.number);
}
delete compact;
}
Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
assert(compact != NULL);
assert(compact->builder == NULL);
uint64_t file_number;
{
mutex_.Lock();
file_number = versions_->NewFileNumber();
pending_outputs_.insert(file_number);
CompactionState::Output out;
out.number = file_number;
out.smallest.Clear();
out.largest.Clear();
compact->outputs.push_back(out);
mutex_.Unlock();
}
// Make the output file
std::string fname = TableFileName(dbname_, file_number);
Status s = env_->NewWritableFile(fname, &compact->outfile);
if (s.ok()) {
compact->builder = new TableBuilder(options_, compact->outfile);
}
return s;
}
Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
Iterator* input) {
assert(compact != NULL);
assert(compact->outfile != NULL);
assert(compact->builder != NULL);
const uint64_t output_number = compact->current_output()->number;
assert(output_number != 0);
// Check for iterator errors
Status s = input->status();
const uint64_t current_entries = compact->builder->NumEntries();
if (s.ok()) {
s = compact->builder->Finish();
} else {
compact->builder->Abandon();
}
const uint64_t current_bytes = compact->builder->FileSize();
compact->current_output()->file_size = current_bytes;
compact->total_bytes += current_bytes;
delete compact->builder;
compact->builder = NULL;
// Finish and check for file errors
if (s.ok()) {
s = compact->outfile->Sync();
}
if (s.ok()) {
s = compact->outfile->Close();
}
delete compact->outfile;
compact->outfile = NULL;
if (s.ok() && current_entries > 0) {
// Verify that the table is usable
Iterator* iter = table_cache_->NewIterator(ReadOptions(),
output_number,
current_bytes);
s = iter->status();
delete iter;
if (s.ok()) {
Log(options_.info_log,
"Generated table #%llu: %lld keys, %lld bytes",
(unsigned long long) output_number,
(unsigned long long) current_entries,
(unsigned long long) current_bytes);
}
}
return s;
}
Status DBImpl::InstallCompactionResults(CompactionState* compact) {
mutex_.AssertHeld();
Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes",
compact->compaction->num_input_files(0),
compact->compaction->level(),
compact->compaction->num_input_files(1),
compact->compaction->level() + 1,
static_cast<long long>(compact->total_bytes));
// Add compaction outputs
compact->compaction->AddInputDeletions(compact->compaction->edit());
const int level = compact->compaction->level();
for (size_t i = 0; i < compact->outputs.size(); i++) {
const CompactionState::Output& out = compact->outputs[i];
compact->compaction->edit()->AddFile(
level + 1,
out.number, out.file_size, out.smallest, out.largest);
}
return versions_->LogAndApply(compact->compaction->edit(), &mutex_, &bg_log_cv_, &bg_log_occupied_);
}
Status DBImpl::DoCompactionWork(CompactionState* compact) {
const uint64_t start_micros = env_->NowMicros();
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
Log(options_.info_log, "Compacting %d@%d + %d@%d files",
compact->compaction->num_input_files(0),
compact->compaction->level(),
compact->compaction->num_input_files(1),
compact->compaction->level() + 1);
assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
assert(compact->builder == NULL);
assert(compact->outfile == NULL);
if (snapshots_.empty()) {
compact->smallest_snapshot = versions_->LastSequence();
} else {
compact->smallest_snapshot = snapshots_.oldest()->number_;
}
// Release mutex while we're actually doing the compaction work
mutex_.Unlock();
Iterator* input = versions_->MakeInputIterator(compact->compaction);
input->SeekToFirst();
Status status;
ParsedInternalKey ikey;
std::string current_user_key;
bool has_current_user_key = false;
SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
uint64_t i = 0;
for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
Slice key = input->key();
// Handle key/value, add to state, etc.
bool drop = false;
if (!ParseInternalKey(key, &ikey)) {
// Do not hide error keys
current_user_key.clear();
has_current_user_key = false;
last_sequence_for_key = kMaxSequenceNumber;
} else {
if (!has_current_user_key ||
user_comparator()->Compare(ikey.user_key,
Slice(current_user_key)) != 0) {
// First occurrence of this user key
current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
has_current_user_key = true;
last_sequence_for_key = kMaxSequenceNumber;
}
// Just remember that last_sequence_for_key is decreasing over time, and
// all of this makes sense.
if (last_sequence_for_key <= compact->smallest_snapshot) {
// Hidden by an newer entry for same user key
drop = true; // (A)
} else if (ikey.type == kTypeDeletion &&
ikey.sequence <= compact->smallest_snapshot &&
compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
// For this user key:
// (1) there is no data in higher levels
// (2) data in lower levels will have larger sequence numbers
// (3) data in layers that are being compacted here and have
// smaller sequence numbers will be dropped in the next
// few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped.
drop = true;
}
// If we're going to drop this key, and there was no previous version of
// this key, and it was written at or after the garbage cutoff, we keep
// it.
if (drop &&
last_sequence_for_key == kMaxSequenceNumber &&
ikey.sequence >= manual_garbage_cutoff_) {
drop = false;
}
last_sequence_for_key = ikey.sequence;
}
if (!drop) {
// Open output file if necessary
if (compact->builder == NULL) {
status = OpenCompactionOutputFile(compact);
if (!status.ok()) {
break;
}
}
if (compact->builder->NumEntries() == 0) {
compact->current_output()->smallest.DecodeFrom(key);
}
compact->current_output()->largest.DecodeFrom(key);
compact->builder->Add(key, input->value());
// Close output file if it is big enough
if (compact->builder->FileSize() >=
compact->compaction->MaxOutputFileSize()) {
status = FinishCompactionOutputFile(compact, input);
if (!status.ok()) {
break;
}
}
}
input->Next();
}
if (status.ok() && shutting_down_.Acquire_Load()) {
status = Status::IOError("Deleting DB during compaction");
}
if (status.ok() && compact->builder != NULL) {
status = FinishCompactionOutputFile(compact, input);
}
if (status.ok()) {
status = input->status();
}
delete input;
input = NULL;
CompactionStats stats;
stats.micros = env_->NowMicros() - start_micros - imm_micros;
for (int which = 0; which < 2; which++) {
for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
stats.bytes_read += compact->compaction->input(which, i)->file_size;
}
}
for (size_t i = 0; i < compact->outputs.size(); i++) {
stats.bytes_written += compact->outputs[i].file_size;
}
mutex_.Lock();
stats_[compact->compaction->level() + 1].Add(stats);
if (status.ok()) {
status = InstallCompactionResults(compact);
}
VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log,
"compacted to: %s", versions_->LevelSummary(&tmp));
return status;
}
namespace {
struct IterState {
port::Mutex* mu;
Version* version;
MemTable* mem;
MemTable* imm;
};
static void CleanupIteratorState(void* arg1, void* arg2) {
IterState* state = reinterpret_cast<IterState*>(arg1);
state->mu->Lock();
state->mem->Unref();
if (state->imm != NULL) state->imm->Unref();
state->version->Unref();
state->mu->Unlock();
delete state;
}
} // namespace
Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, uint64_t number,
SequenceNumber* latest_snapshot,
uint32_t* seed, bool external_sync) {
IterState* cleanup = new IterState;
if (!external_sync) {
mutex_.Lock();
}
++straight_reads_;
*latest_snapshot = versions_->LastSequence();
// Collect together all needed child iterators
std::vector<Iterator*> list;
list.push_back(mem_->NewIterator());
mem_->Ref();
if (imm_ != NULL) {
list.push_back(imm_->NewIterator());
imm_->Ref();
}
versions_->current()->AddSomeIterators(options, number, &list);
Iterator* internal_iter =
NewMergingIterator(&internal_comparator_, &list[0], list.size());
versions_->current()->Ref();
cleanup->mu = &mutex_;
cleanup->mem = mem_;
cleanup->imm = imm_;
cleanup->version = versions_->current();
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL);
*seed = ++seed_;
if (!external_sync) {
mutex_.Unlock();
}
return internal_iter;
}
Iterator* DBImpl::TEST_NewInternalIterator() {
SequenceNumber ignored;
uint32_t ignored_seed;
return NewInternalIterator(ReadOptions(), 0, &ignored, &ignored_seed, false);
}
int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
MutexLock l(&mutex_);
return versions_->MaxNextLevelOverlappingBytes();
}
Status DBImpl::Get(const ReadOptions& options,
const Slice& key,
std::string* value) {
Status s;
MutexLock l(&mutex_);
SequenceNumber snapshot;
if (options.snapshot != NULL) {
snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
} else {
snapshot = versions_->LastSequence();
}
MemTable* mem = mem_;
MemTable* imm = imm_;
Version* current = versions_->current();
mem->Ref();
if (imm != NULL) imm->Ref();
current->Ref();
bool have_stat_update = false;
Version::GetStats stats;
// Unlock while reading from files and memtables
{
mutex_.Unlock();
// First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s)) {
// Done
} else if (imm != NULL && imm->Get(lkey, value, &s)) {
// Done
} else {
s = current->Get(options, lkey, value, &stats);
have_stat_update = true;
}
mutex_.Lock();
}
if (have_stat_update && current->UpdateStats(stats)) {
bg_compaction_cv_.Signal();
}
++straight_reads_;
mem->Unref();
if (imm != NULL) imm->Unref();
current->Unref();
return s;
}
Iterator* DBImpl::NewIterator(const ReadOptions& options) {
SequenceNumber latest_snapshot;
uint32_t seed;
Iterator* iter = NewInternalIterator(options, 0, &latest_snapshot, &seed, false);
return NewDBIterator(
this, user_comparator(), iter,
(options.snapshot != NULL
? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
: latest_snapshot),
seed);
}
void DBImpl::GetReplayTimestamp(std::string* timestamp) {
uint64_t file = 0;
uint64_t seqno = 0;
{
MutexLock l(&mutex_);
file = versions_->NewFileNumber();
versions_->ReuseFileNumber(file);
seqno = versions_->LastSequence();
}
timestamp->clear();
PutVarint64(timestamp, file);
PutVarint64(timestamp, seqno);
}
void DBImpl::AllowGarbageCollectBeforeTimestamp(const std::string& timestamp) {
Slice ts_slice(timestamp);
uint64_t file = 0;
uint64_t seqno = 0;
if (timestamp == "all") {
// keep zeroes
} else if (timestamp == "now") {
MutexLock l(&mutex_);
seqno = versions_->LastSequence();
if (manual_garbage_cutoff_ < seqno) {
manual_garbage_cutoff_ = seqno;
}
} else if (GetVarint64(&ts_slice, &file) &&
GetVarint64(&ts_slice, &seqno)) {
MutexLock l(&mutex_);
if (manual_garbage_cutoff_ < seqno) {
manual_garbage_cutoff_ = seqno;
}
}
}
bool DBImpl::ValidateTimestamp(const std::string& ts) {
uint64_t file = 0;
uint64_t seqno = 0;
Slice ts_slice(ts);
return ts == "all" || ts == "now" ||
(GetVarint64(&ts_slice, &file) &&
GetVarint64(&ts_slice, &seqno));
}
int DBImpl::CompareTimestamps(const std::string& lhs, const std::string& rhs) {
uint64_t now = 0;
uint64_t lhs_seqno = 0;
uint64_t rhs_seqno = 0;
uint64_t tmp;
if (lhs == "now" || rhs == "now") {
MutexLock l(&mutex_);
now = versions_->LastSequence();
}
if (lhs == "all") {
lhs_seqno = 0;
} else if (lhs == "now") {
lhs_seqno = now;
} else {
Slice lhs_slice(lhs);
GetVarint64(&lhs_slice, &tmp);
GetVarint64(&lhs_slice, &lhs_seqno);
}
if (rhs == "all") {
rhs_seqno = 0;
} else if (rhs == "now") {
rhs_seqno = now;
} else {
Slice rhs_slice(rhs);
GetVarint64(&rhs_slice, &tmp);
GetVarint64(&rhs_slice, &rhs_seqno);
}
if (lhs_seqno < rhs_seqno) {
return -1;
} else if (lhs_seqno > rhs_seqno) {
return 1;
} else {
return 0;
}
}
Status DBImpl::GetReplayIterator(const std::string& timestamp,
ReplayIterator** iter) {
*iter = NULL;
Slice ts_slice(timestamp);
uint64_t file = 0;
uint64_t seqno = 0;
if (timestamp == "all") {
seqno = 0;
} else if (timestamp == "now") {
MutexLock l(&mutex_);
file = versions_->NewFileNumber();
versions_->ReuseFileNumber(file);
seqno = versions_->LastSequence();
} else if (!GetVarint64(&ts_slice, &file) ||
!GetVarint64(&ts_slice, &seqno)) {
return Status::InvalidArgument("Timestamp is not valid");
}
ReadOptions options;
SequenceNumber latest_snapshot;
uint32_t seed;
MutexLock l(&mutex_);
Iterator* internal_iter = NewInternalIterator(options, file, &latest_snapshot, &seed, true);
internal_iter->SeekToFirst();
ReplayIteratorImpl* iterimpl;
iterimpl = new ReplayIteratorImpl(
this, &mutex_, user_comparator(), internal_iter, mem_, SequenceNumber(seqno));
*iter = iterimpl;
replay_iters_.push_back(iterimpl);
return Status::OK();
}
void DBImpl::ReleaseReplayIterator(ReplayIterator* _iter) {
MutexLock l(&mutex_);
ReplayIteratorImpl* iter = reinterpret_cast<ReplayIteratorImpl*>(_iter);
for (std::list<ReplayIteratorImpl*>::iterator it = replay_iters_.begin();
it != replay_iters_.end(); ++it) {
if (*it == iter) {
iter->cleanup(); // calls delete
replay_iters_.erase(it);
return;
}
}
}
void DBImpl::RecordReadSample(Slice key) {
MutexLock l(&mutex_);
++straight_reads_;
if (versions_->current()->RecordReadSample(key)) {
bg_compaction_cv_.Signal();
}
}
SequenceNumber DBImpl::LastSequence() {
MutexLock l(&mutex_);
return versions_->LastSequence();
}
const Snapshot* DBImpl::GetSnapshot() {
MutexLock l(&mutex_);
return snapshots_.New(versions_->LastSequence());
}
void DBImpl::ReleaseSnapshot(const Snapshot* s) {
MutexLock l(&mutex_);
snapshots_.Delete(reinterpret_cast<const SnapshotImpl*>(s));
}
// Convenience methods
Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
return DB::Put(o, key, val);
}
Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
return DB::Delete(options, key);
}
// Information kept for every waiting writer
struct DBImpl::Writer {
port::Mutex mtx;
port::CondVar cv;
bool linked;
Writer* next;
uint64_t start_sequence;
uint64_t end_sequence;
std::tr1::shared_ptr<WritableFile> logfile;
std::tr1::shared_ptr<log::Writer> log;
MemTable* mem;
std::tr1::shared_ptr<WritableFile> old_logfile;
std::tr1::shared_ptr<log::Writer> old_log;
explicit Writer()
: mtx(),
cv(&mtx),
linked(false),
next(NULL),
start_sequence(0),
end_sequence(0),
logfile(),
log(),
mem(NULL),
old_logfile(),
old_log() {
}
~Writer() throw () {
}
};
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Writer w;
Status s;
s = SequenceWriteBegin(&w, updates);
if (s.ok() && updates != NULL) { // NULL batch is for compactions
WriteBatchInternal::SetSequence(updates, w.start_sequence);
// Add to log and apply to memtable. We do this without holding the lock
// because both the log and the memtable are safe for concurrent access.
// The synchronization with readers occurs with SequenceWriteEnd.
s = w.log->AddRecord(WriteBatchInternal::Contents(updates));
if (s.ok()) {
s = WriteBatchInternal::InsertInto(updates, w.mem);
}
}
if (s.ok() && options.sync) {
s = w.logfile->Sync();
}
SequenceWriteEnd(&w);
return s;
}
Status DBImpl::SequenceWriteBegin(Writer* w, WriteBatch* updates) {
Status s;
MutexLock l(&mutex_);
straight_reads_ = 0;
bool force = updates == NULL;
bool allow_delay = !force;
bool enqueue_mem = false;
w->old_log.reset();
w->old_logfile.reset();
while (true) {
if (!bg_error_.ok()) {
// Yield previous error
s = bg_error_;
break;
} else if (!force &&
(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
// There is room in current memtable
// Note that this is a sloppy check. We can overfill a memtable by the
// amount of concurrently written data.
break;
} else if (imm_ != NULL) {
// We have filled up the current memtable, but the previous
// one is still being compacted, so we wait.
bg_compaction_cv_.Signal();
bg_memtable_cv_.Signal();
bg_fg_cv_.Wait();
} else {
// Attempt to switch to a new memtable and trigger compaction of old
assert(versions_->PrevLogNumber() == 0);
uint64_t new_log_number = versions_->NewFileNumber();
WritableFile* lfile = NULL;
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
if (!s.ok()) {
// Avoid chewing through file number space in a tight loop.
versions_->ReuseFileNumber(new_log_number);
break;
}
w->old_log = log_;
w->old_logfile = logfile_;
logfile_.reset(lfile);
logfile_number_ = new_log_number;
log_.reset(new log::Writer(lfile));
imm_ = mem_;
has_imm_.Release_Store(imm_);
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
force = false; // Do not force another compaction if have room
enqueue_mem = true;
break;
}
}
if (s.ok()) {
w->linked = true;
w->next = NULL;
uint64_t diff = updates ? WriteBatchInternal::Count(updates) : 0;
uint64_t ticket = __sync_add_and_fetch(&writers_upper_, 1 + diff);
w->start_sequence = ticket - diff;
w->end_sequence = ticket;
w->logfile = logfile_;
w->log = log_;
w->mem = mem_;
w->mem->Ref();
}
if (enqueue_mem) {
for (std::list<ReplayIteratorImpl*>::iterator it = replay_iters_.begin();
it != replay_iters_.end(); ++it) {
(*it)->enqueue(mem_, w->start_sequence);
}
}
return s;
}
void DBImpl::SequenceWriteEnd(Writer* w) {
if (!w->linked) {
return;
}
// wait until we are next
while (__sync_fetch_and_add(&writers_lower_, 0) < w->start_sequence)
;
// swizzle state to make ours visible
{
MutexLock l(&mutex_);
versions_->SetLastSequence(w->end_sequence);
}
// signal the next writer
__sync_fetch_and_add(&writers_lower_, 1 + w->end_sequence - w->start_sequence);
// must do in order: log, logfile
if (w->old_log) {
assert(w->old_logfile);
w->old_log.reset();
w->old_logfile.reset();
bg_memtable_cv_.Signal();
}
// safe because Unref is synchronized internally
if (w->mem) {
w->mem->Unref();
}
}
bool DBImpl::GetProperty(const Slice& property, std::string* value) {
value->clear();
MutexLock l(&mutex_);
Slice in = property;
Slice prefix("leveldb.");
if (!in.starts_with(prefix)) return false;
in.remove_prefix(prefix.size());
if (in.starts_with("num-files-at-level")) {
in.remove_prefix(strlen("num-files-at-level"));
uint64_t level;
bool ok = ConsumeDecimalNumber(&in, &level) && in.empty();
if (!ok || level >= config::kNumLevels) {
return false;
} else {
char buf[100];
snprintf(buf, sizeof(buf), "%d",
versions_->NumLevelFiles(static_cast<int>(level)));
*value = buf;
return true;
}
} else if (in == "stats") {
char buf[200];
snprintf(buf, sizeof(buf),
" Compactions\n"
"Level Files Size(MB) Time(sec) Read(MB) Write(MB)\n"
"--------------------------------------------------\n"
);
value->append(buf);
for (int level = 0; level < config::kNumLevels; level++) {
int files = versions_->NumLevelFiles(level);
if (stats_[level].micros > 0 || files > 0) {
snprintf(
buf, sizeof(buf),
"%3d %8d %8.0f %9.0f %8.0f %9.0f\n",
level,
files,
versions_->NumLevelBytes(level) / 1048576.0,
stats_[level].micros / 1e6,
stats_[level].bytes_read / 1048576.0,
stats_[level].bytes_written / 1048576.0);
value->append(buf);
}
}
return true;
} else if (in == "sstables") {
*value = versions_->current()->DebugString();
return true;
}
return false;
}
void DBImpl::GetApproximateSizes(
const Range* range, int n,
uint64_t* sizes) {
// TODO(opt): better implementation
Version* v;
{
MutexLock l(&mutex_);
versions_->current()->Ref();
v = versions_->current();
}
for (int i = 0; i < n; i++) {
// Convert user_key into a corresponding internal key.
InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
uint64_t start = versions_->ApproximateOffsetOf(v, k1);
uint64_t limit = versions_->ApproximateOffsetOf(v, k2);
sizes[i] = (limit >= start ? limit - start : 0);
}
{
MutexLock l(&mutex_);
v->Unref();
}
}
Status DBImpl::LiveBackup(const Slice& _name) {
Slice name = _name;
size_t name_sz = 0;
for (; name_sz < name.size() && name.data()[name_sz] != '\0'; ++name_sz)
;
name = Slice(name.data(), name_sz);
std::set<uint64_t> live;
uint64_t ticket = __sync_add_and_fetch(&writers_upper_, 1);
while (__sync_fetch_and_add(&writers_lower_, 0) < ticket)
;
{
MutexLock l(&mutex_);
versions_->SetLastSequence(ticket);
while (backup_in_progress_.Acquire_Load() != NULL) {
backup_cv_.Wait();
}
backup_in_progress_.Release_Store(this);
while (bg_log_occupied_) {
bg_log_cv_.Wait();
}
bg_log_occupied_ = true;
// note that this logic assumes that DeleteObsoleteFiles never releases
// mutex_, so that once we release at this brace, we'll guarantee that it
// will see backup_in_progress_. If you change DeleteObsoleteFiles to
// release mutex_, you'll need to add some sort of synchronization in place
// of this text block.
versions_->AddLiveFiles(&live);
__sync_fetch_and_add(&writers_lower_, 1);
}
Status s;
std::vector<std::string> filenames;
s = env_->GetChildren(dbname_, &filenames);
std::string backup_dir = dbname_ + "/backup-" + name.ToString() + "/";
if (s.ok()) {
s = env_->CreateDir(backup_dir);
}
uint64_t number;
FileType type;
for (size_t i = 0; i < filenames.size(); i++) {
if (!s.ok()) {
continue;
}
if (ParseFileName(filenames[i], &number, &type)) {
std::string src = dbname_ + "/" + filenames[i];
std::string target = backup_dir + "/" + filenames[i];
switch (type) {
case kLogFile:
case kDescriptorFile:
case kCurrentFile:
case kInfoLogFile:
s = env_->CopyFile(src, target);
break;
case kTableFile:
// If it's a file referenced by a version, we have logged that version
// and applied it. Our MANIFEST will reflect that, and the file
// number assigned to new files will be greater or equal, ensuring
// that they aren't overwritten. Any file not in "live" either exists
// past the current manifest (output of ongoing compaction) or so far
// in the past we don't care (we're going to delete it at the end of
// this backup). I'd rather play safe than sorry.
//
// Under no circumstances should you collapse this to a single
// LinkFile without the conditional as it has implications for backups
// that share hardlinks. Opening an older backup that has files
// hardlinked with newer backups will overwrite "immutable" files in
// the newer backups because they aren't in our manifest, and we do an
// open/write rather than a creat/rename. We avoid linking these
// files.
if (live.find(number) != live.end()) {
s = env_->LinkFile(src, target);
}
break;
case kTempFile:
case kDBLockFile:
break;
}
}
}
{
MutexLock l(&mutex_);
backup_in_progress_.Release_Store(NULL);
if (s.ok() && backup_deferred_delete_) {
DeleteObsoleteFiles();
}
backup_deferred_delete_ = false;
bg_log_occupied_ = false;
bg_log_cv_.Signal();
backup_cv_.Signal();
}
return s;
}
// Default implementations of convenience methods that subclasses of DB
// can call if they wish
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch;
batch.Put(key, value);
return Write(opt, &batch);
}
Status DB::Delete(const WriteOptions& opt, const Slice& key) {
WriteBatch batch;
batch.Delete(key);
return Write(opt, &batch);
}
DB::~DB() { }
Status DB::Open(const Options& options, const std::string& dbname,
DB** dbptr) {
*dbptr = NULL;
DBImpl* impl = new DBImpl(options, dbname);
impl->mutex_.Lock();
VersionEdit edit;
Status s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists
if (s.ok()) {
uint64_t new_log_number = impl->versions_->NewFileNumber();
WritableFile* lfile;
s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
&lfile);
if (s.ok()) {
edit.SetLogNumber(new_log_number);
impl->logfile_.reset(lfile);
impl->logfile_number_ = new_log_number;
impl->log_.reset(new log::Writer(lfile));
s = impl->versions_->LogAndApply(&edit, &impl->mutex_, &impl->bg_log_cv_, &impl->bg_log_occupied_);
}
if (s.ok()) {
impl->DeleteObsoleteFiles();
impl->bg_optimistic_cv_.Signal();
impl->bg_compaction_cv_.Signal();
impl->bg_memtable_cv_.Signal();
}
}
impl->pending_outputs_.clear();
impl->allow_background_activity_ = true;
impl->bg_optimistic_cv_.SignalAll();
impl->bg_compaction_cv_.SignalAll();
impl->bg_memtable_cv_.SignalAll();
impl->mutex_.Unlock();
if (s.ok()) {
*dbptr = impl;
} else {
delete impl;
}
impl->writers_upper_ = impl->versions_->LastSequence();
impl->writers_lower_ = impl->writers_upper_ + 1;
return s;
}
Snapshot::~Snapshot() {
}
Status DestroyDB(const std::string& dbname, const Options& options) {
Env* env = options.env;
std::vector<std::string> filenames;
// Ignore error in case directory does not exist
env->GetChildren(dbname, &filenames);
if (filenames.empty()) {
return Status::OK();
}
FileLock* lock;
const std::string lockname = LockFileName(dbname);
Status result = env->LockFile(lockname, &lock);
if (result.ok()) {
uint64_t number;
FileType type;
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type) &&
type != kDBLockFile) { // Lock file will be deleted at end
Status del = env->DeleteFile(dbname + "/" + filenames[i]);
if (result.ok() && !del.ok()) {
result = del;
}
}
}
env->UnlockFile(lock); // Ignore error since state is already gone
env->DeleteFile(lockname);
env->DeleteDir(dbname); // Ignore error in case dir contains other files
}
return result;
}
} // namespace hyperleveldb