Merge branch 'master' into performance

Conflicts:
	db/db_impl.cc
	db/db_test.cc
	db/memtable.cc
	db/version_set.cc
	include/rocksdb/statistics.h
	util/statistics_imp.h
This commit is contained in:
Kai Liu
2014-01-23 16:32:49 -08:00
34 changed files with 5190 additions and 439 deletions

View File

@@ -877,14 +877,11 @@ void DBImpl::PurgeObsoleteWALFiles() {
}
}
// If externalTable is set, then apply recovered transactions
// to that table. This is used for readonly mode.
Status DBImpl::Recover(VersionEdit* edit, MemTable* external_table,
bool error_if_log_file_exist) {
Status DBImpl::Recover(bool read_only, bool error_if_log_file_exist) {
mutex_.AssertHeld();
assert(db_lock_ == nullptr);
if (!external_table) {
if (!read_only) {
// We call CreateDirIfMissing() as the directory may already exist (if we
// are reopening a DB), when this happens we don't want creating the
// directory to cause an error. However, we need to check if creating the
@@ -967,11 +964,11 @@ Status DBImpl::Recover(VersionEdit* edit, MemTable* external_table,
// Recover in the order in which the logs were generated
std::sort(logs.begin(), logs.end());
for (const auto& log : logs) {
s = RecoverLogFile(log, edit, &max_sequence, external_table);
// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(log);
s = RecoverLogFile(log, &max_sequence, read_only);
}
if (s.ok()) {
@@ -986,10 +983,8 @@ Status DBImpl::Recover(VersionEdit* edit, MemTable* external_table,
return s;
}
Status DBImpl::RecoverLogFile(uint64_t log_number,
VersionEdit* edit,
SequenceNumber* max_sequence,
MemTable* external_table) {
Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
bool read_only) {
struct LogReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
@@ -1006,6 +1001,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
mutex_.AssertHeld();
VersionEdit edit;
// Open the log file
std::string fname = LogFileName(options_.wal_dir, log_number);
unique_ptr<SequentialFile> file;
@@ -1035,11 +1032,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
std::string scratch;
Slice record;
WriteBatch batch;
MemTable* mem = nullptr;
if (external_table) {
mem = external_table;
}
while (reader.ReadRecord(&record, &scratch) && status.ok()) {
bool memtable_empty = true;
while (reader.ReadRecord(&record, &scratch)) {
if (record.size() < 12) {
reporter.Corruption(
record.size(), Status::Corruption("log record too small"));
@@ -1047,14 +1041,11 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
}
WriteBatchInternal::SetContents(&batch, record);
if (mem == nullptr) {
mem = new MemTable(internal_comparator_, options_);
mem->Ref();
}
status = WriteBatchInternal::InsertInto(&batch, mem, &options_);
status = WriteBatchInternal::InsertInto(&batch, mem_, &options_);
memtable_empty = false;
MaybeIgnoreError(&status);
if (!status.ok()) {
break;
return status;
}
const SequenceNumber last_seq =
WriteBatchInternal::Sequence(&batch) +
@@ -1063,28 +1054,44 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
*max_sequence = last_seq;
}
if (!external_table &&
mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
status = WriteLevel0TableForRecovery(mem, edit);
if (!read_only &&
mem_->ApproximateMemoryUsage() > options_.write_buffer_size) {
status = WriteLevel0TableForRecovery(mem_, &edit);
// we still want to clear memtable, even if the recovery failed
delete mem_->Unref();
mem_ = new MemTable(internal_comparator_, options_);
mem_->Ref();
memtable_empty = true;
if (!status.ok()) {
// Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail.
break;
return status;
}
delete mem->Unref();
mem = nullptr;
}
}
if (status.ok() && mem != nullptr && !external_table) {
status = WriteLevel0TableForRecovery(mem, edit);
// Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail.
if (!memtable_empty && !read_only) {
status = WriteLevel0TableForRecovery(mem_, &edit);
delete mem_->Unref();
mem_ = new MemTable(internal_comparator_, options_);
mem_->Ref();
if (!status.ok()) {
return status;
}
}
if (mem != nullptr && !external_table) {
delete mem->Unref();
if (edit.NumEntries() > 0) {
// if read_only, NumEntries() will be 0
assert(!read_only);
// writing log number in the manifest means that any log file
// with number strongly less than (log_number + 1) is already
// recovered and should be ignored on next reincarnation.
// Since we already recovered log_number, we want all logs
// with numbers `<= log_number` (includes this one) to be ignored
edit.SetLogNumber(log_number + 1);
status = versions_->LogAndApply(&edit, &mutex_);
}
return status;
}
@@ -2579,9 +2586,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
CompactionStats stats;
stats.micros = env_->NowMicros() - start_micros - imm_micros;
if (options_.statistics.get()) {
options_.statistics.get()->measureTime(COMPACTION_TIME, stats.micros);
}
MeasureTime(options_.statistics.get(), COMPACTION_TIME, stats.micros);
stats.files_in_leveln = compact->compaction->num_input_files(0);
stats.files_in_levelnp1 = compact->compaction->num_input_files(1);
@@ -3106,8 +3111,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
// have succeeded in memtable but Status reports error for all writes.
throw std::runtime_error("In memory WriteBatch corruption!");
}
SetTickerCount(options_.statistics.get(),
SEQUENCE_NUMBER, last_sequence);
SetTickerCount(options_.statistics.get(), SEQUENCE_NUMBER,
last_sequence);
}
StartPerfTimer(&pre_post_process_timer);
if (updates == &tmp_batch_) tmp_batch_.Clear();
@@ -3249,15 +3254,15 @@ Status DBImpl::MakeRoomForWrite(bool force,
// individual write by 0-1ms to reduce latency variance. Also,
// this delay hands over some CPU to the compaction thread in
// case it is sharing the same core as the writer.
uint64_t slowdown =
SlowdownAmount(versions_->current()->NumLevelFiles(0),
options_.level0_slowdown_writes_trigger,
options_.level0_stop_writes_trigger);
mutex_.Unlock();
uint64_t delayed;
{
StopWatch sw(env_, options_.statistics.get(), STALL_L0_SLOWDOWN_COUNT);
env_->SleepForMicroseconds(
SlowdownAmount(versions_->current()->NumLevelFiles(0),
options_.level0_slowdown_writes_trigger,
options_.level0_stop_writes_trigger)
);
env_->SleepForMicroseconds(slowdown);
delayed = sw.ElapsedMicros();
}
RecordTick(options_.statistics.get(), STALL_L0_SLOWDOWN_MICROS, delayed);
@@ -3874,9 +3879,8 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
delete impl;
return s;
}
impl->mutex_.Lock(); // DBImpl::Recover() requires lock being held
VersionEdit edit;
s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists
impl->mutex_.Lock();
s = impl->Recover(); // Handles create_if_missing, error_if_exists
if (s.ok()) {
uint64_t new_log_number = impl->versions_->NewFileNumber();
unique_ptr<WritableFile> lfile;
@@ -3888,6 +3892,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
);
if (s.ok()) {
lfile->SetPreallocationBlockSize(1.1 * impl->options_.write_buffer_size);
VersionEdit edit;
edit.SetLogNumber(new_log_number);
impl->logfile_number_ = new_log_number;
impl->log_.reset(new log::Writer(std::move(lfile)));