mirror of
https://github.com/XRPLF/rippled.git
synced 2025-12-06 17:27:55 +00:00
Merge branch 'master' into columnfamilies
Conflicts: db/db_impl.cc db/db_impl_readonly.cc db/db_test.cc db/version_edit.cc db/version_edit.h db/version_set.cc db/version_set.h db/version_set_reduce_num_levels.cc
This commit is contained in:
293
db/db_impl.cc
293
db/db_impl.cc
@@ -57,6 +57,7 @@
|
||||
#include "util/mutexlock.h"
|
||||
#include "util/perf_context_imp.h"
|
||||
#include "util/stop_watch.h"
|
||||
#include "util/autovector.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
@@ -254,8 +255,8 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
|
||||
: env_(options.env),
|
||||
dbname_(dbname),
|
||||
internal_comparator_(options.comparator),
|
||||
options_(SanitizeOptions(
|
||||
dbname, &internal_comparator_, &internal_filter_policy_, options)),
|
||||
options_(SanitizeOptions(dbname, &internal_comparator_,
|
||||
&internal_filter_policy_, options)),
|
||||
internal_filter_policy_(options.filter_policy),
|
||||
owns_info_log_(options_.info_log != options.info_log),
|
||||
db_lock_(nullptr),
|
||||
@@ -263,8 +264,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
|
||||
shutting_down_(nullptr),
|
||||
bg_cv_(&mutex_),
|
||||
mem_rep_factory_(options_.memtable_factory.get()),
|
||||
mem_(new MemTable(internal_comparator_, mem_rep_factory_,
|
||||
NumberLevels(), options_)),
|
||||
mem_(new MemTable(internal_comparator_, options_)),
|
||||
logfile_number_(0),
|
||||
super_version_(nullptr),
|
||||
tmp_batch_(),
|
||||
@@ -410,7 +410,7 @@ uint64_t DBImpl::TEST_Current_Manifest_FileNo() {
|
||||
}
|
||||
|
||||
Status DBImpl::NewDB() {
|
||||
VersionEdit new_db(NumberLevels());
|
||||
VersionEdit new_db;
|
||||
new_db.SetComparatorName(user_comparator()->Name());
|
||||
new_db.SetLogNumber(0);
|
||||
new_db.SetNextFile(2);
|
||||
@@ -1048,8 +1048,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
|
||||
WriteBatchInternal::SetContents(&batch, record);
|
||||
|
||||
if (mem == nullptr) {
|
||||
mem = new MemTable(internal_comparator_, mem_rep_factory_,
|
||||
NumberLevels(), options_);
|
||||
mem = new MemTable(internal_comparator_, options_);
|
||||
mem->Ref();
|
||||
}
|
||||
status = WriteBatchInternal::InsertInto(&batch, mem, &options_);
|
||||
@@ -1300,6 +1299,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
|
||||
void DBImpl::CompactRange(const ColumnFamilyHandle& column_family,
|
||||
const Slice* begin, const Slice* end,
|
||||
bool reduce_level, int target_level) {
|
||||
FlushMemTable(FlushOptions());
|
||||
int max_level_with_files = 1;
|
||||
{
|
||||
MutexLock l(&mutex_);
|
||||
@@ -1310,9 +1310,15 @@ void DBImpl::CompactRange(const ColumnFamilyHandle& column_family,
|
||||
}
|
||||
}
|
||||
}
|
||||
TEST_FlushMemTable(); // TODO(sanjay): Skip if memtable does not overlap
|
||||
for (int level = 0; level < max_level_with_files; level++) {
|
||||
TEST_CompactRange(level, begin, end);
|
||||
for (int level = 0; level <= max_level_with_files; level++) {
|
||||
// in case the compaction is unversal or if we're compacting the
|
||||
// bottom-most level, the output level will be the same as input one
|
||||
if (options_.compaction_style == kCompactionStyleUniversal ||
|
||||
level == max_level_with_files) {
|
||||
RunManualCompaction(level, level, begin, end);
|
||||
} else {
|
||||
RunManualCompaction(level, level + 1, begin, end);
|
||||
}
|
||||
}
|
||||
|
||||
if (reduce_level) {
|
||||
@@ -1324,13 +1330,13 @@ void DBImpl::CompactRange(const ColumnFamilyHandle& column_family,
|
||||
// return the same level if it cannot be moved
|
||||
int DBImpl::FindMinimumEmptyLevelFitting(int level) {
|
||||
mutex_.AssertHeld();
|
||||
Version* current = versions_->current();
|
||||
int minimum_level = level;
|
||||
for (int i = level - 1; i > 0; --i) {
|
||||
// stop if level i is not empty
|
||||
if (versions_->NumLevelFiles(i) > 0) break;
|
||||
|
||||
if (current->NumLevelFiles(i) > 0) break;
|
||||
// stop if level i is too small (cannot fit the level files)
|
||||
if (versions_->MaxBytesForLevel(i) < versions_->NumLevelBytes(level)) break;
|
||||
if (versions_->MaxBytesForLevel(i) < current->NumLevelBytes(level)) break;
|
||||
|
||||
minimum_level = i;
|
||||
}
|
||||
@@ -1376,7 +1382,7 @@ void DBImpl::ReFitLevel(int level, int target_level) {
|
||||
Log(options_.info_log, "Before refitting:\n%s",
|
||||
versions_->current()->DebugString().data());
|
||||
|
||||
VersionEdit edit(NumberLevels());
|
||||
VersionEdit edit;
|
||||
for (const auto& f : versions_->current()->files_[level]) {
|
||||
edit.DeleteFile(level, f->number);
|
||||
edit.AddFile(to_level, f->number, f->file_size, f->smallest, f->largest,
|
||||
@@ -1612,13 +1618,17 @@ Status DBImpl::AppendSortedWalsOfType(const std::string& path,
|
||||
return status;
|
||||
}
|
||||
|
||||
void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
|
||||
assert(level >= 0);
|
||||
void DBImpl::RunManualCompaction(int input_level,
|
||||
int output_level,
|
||||
const Slice* begin,
|
||||
const Slice* end) {
|
||||
assert(input_level >= 0);
|
||||
|
||||
InternalKey begin_storage, end_storage;
|
||||
|
||||
ManualCompaction manual;
|
||||
manual.level = level;
|
||||
manual.input_level = input_level;
|
||||
manual.output_level = output_level;
|
||||
manual.done = false;
|
||||
manual.in_progress = false;
|
||||
// For universal compaction, we enforce every manual compaction to compact
|
||||
@@ -1646,11 +1656,11 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
|
||||
// can compact any range of keys/files.
|
||||
//
|
||||
// bg_manual_only_ is non-zero when at least one thread is inside
|
||||
// TEST_CompactRange(), i.e. during that time no other compaction will
|
||||
// RunManualCompaction(), i.e. during that time no other compaction will
|
||||
// get scheduled (see MaybeScheduleFlushOrCompaction).
|
||||
//
|
||||
// Note that the following loop doesn't stop more that one thread calling
|
||||
// TEST_CompactRange() from getting to the second while loop below.
|
||||
// RunManualCompaction() from getting to the second while loop below.
|
||||
// However, only one of them will actually schedule compaction, while
|
||||
// others will wait on a condition variable until it completes.
|
||||
|
||||
@@ -1680,6 +1690,15 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
|
||||
--bg_manual_only_;
|
||||
}
|
||||
|
||||
void DBImpl::TEST_CompactRange(int level,
|
||||
const Slice* begin,
|
||||
const Slice* end) {
|
||||
int output_level = (options_.compaction_style == kCompactionStyleUniversal)
|
||||
? level
|
||||
: level + 1;
|
||||
RunManualCompaction(level, output_level, begin, end);
|
||||
}
|
||||
|
||||
Status DBImpl::FlushMemTable(const FlushOptions& options) {
|
||||
// nullptr batch means just wait for earlier writes to be done
|
||||
Status s = Write(WriteOptions(), nullptr);
|
||||
@@ -1825,6 +1844,11 @@ void DBImpl::TEST_PurgeObsoleteteWAL() {
|
||||
PurgeObsoleteWALFiles();
|
||||
}
|
||||
|
||||
uint64_t DBImpl::TEST_GetLevel0TotalSize() {
|
||||
MutexLock l(&mutex_);
|
||||
return versions_->current()->NumLevelBytes(0);
|
||||
}
|
||||
|
||||
void DBImpl::BackgroundCallCompaction() {
|
||||
bool madeProgress = false;
|
||||
DeletionState deletion_state(options_.max_write_buffer_number, true);
|
||||
@@ -1899,23 +1923,27 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
|
||||
unique_ptr<Compaction> c;
|
||||
bool is_manual = (manual_compaction_ != nullptr) &&
|
||||
(manual_compaction_->in_progress == false);
|
||||
InternalKey manual_end;
|
||||
InternalKey manual_end_storage;
|
||||
InternalKey* manual_end = &manual_end_storage;
|
||||
if (is_manual) {
|
||||
ManualCompaction* m = manual_compaction_;
|
||||
assert(!m->in_progress);
|
||||
m->in_progress = true; // another thread cannot pick up the same work
|
||||
c.reset(versions_->CompactRange(m->level, m->begin, m->end));
|
||||
if (c) {
|
||||
manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
|
||||
} else {
|
||||
c.reset(versions_->CompactRange(
|
||||
m->input_level, m->output_level, m->begin, m->end, &manual_end));
|
||||
if (!c) {
|
||||
m->done = true;
|
||||
}
|
||||
Log(options_.info_log,
|
||||
"Manual compaction at level-%d from %s .. %s; will stop at %s\n",
|
||||
m->level,
|
||||
"Manual compaction from level-%d to level-%d from %s .. %s; will stop "
|
||||
"at %s\n",
|
||||
m->input_level,
|
||||
m->output_level,
|
||||
(m->begin ? m->begin->DebugString().c_str() : "(begin)"),
|
||||
(m->end ? m->end->DebugString().c_str() : "(end)"),
|
||||
(m->done ? "(end)" : manual_end.DebugString().c_str()));
|
||||
((m->done || manual_end == nullptr)
|
||||
? "(end)"
|
||||
: manual_end->DebugString().c_str()));
|
||||
} else if (!options_.disable_auto_compactions) {
|
||||
c.reset(versions_->PickCompaction());
|
||||
}
|
||||
@@ -1934,13 +1962,11 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
|
||||
f->smallest_seqno, f->largest_seqno);
|
||||
status = versions_->LogAndApply(c->edit(), &mutex_);
|
||||
InstallSuperVersion(deletion_state);
|
||||
VersionSet::LevelSummaryStorage tmp;
|
||||
Version::LevelSummaryStorage tmp;
|
||||
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
|
||||
static_cast<unsigned long long>(f->number),
|
||||
c->level() + 1,
|
||||
static_cast<unsigned long long>(f->number), c->level() + 1,
|
||||
static_cast<unsigned long long>(f->file_size),
|
||||
status.ToString().c_str(),
|
||||
versions_->LevelSummary(&tmp));
|
||||
status.ToString().c_str(), versions_->current()->LevelSummary(&tmp));
|
||||
versions_->ReleaseCompactionFiles(c.get(), status);
|
||||
*madeProgress = true;
|
||||
} else {
|
||||
@@ -1980,13 +2006,19 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
|
||||
// Also note that, if we don't stop here, then the current compaction
|
||||
// writes a new file back to level 0, which will be used in successive
|
||||
// compaction. Hence the manual compaction will never finish.
|
||||
if (options_.compaction_style == kCompactionStyleUniversal) {
|
||||
//
|
||||
// Stop the compaction if manual_end points to nullptr -- this means
|
||||
// that we compacted the whole range. manual_end should always point
|
||||
// to nullptr in case of universal compaction
|
||||
if (manual_end == nullptr) {
|
||||
m->done = true;
|
||||
}
|
||||
if (!m->done) {
|
||||
// We only compacted part of the requested range. Update *m
|
||||
// to the range that is left to be compacted.
|
||||
m->tmp_storage = manual_end;
|
||||
// Universal compaction should always compact the whole range
|
||||
assert(options_.compaction_style != kCompactionStyleUniversal);
|
||||
m->tmp_storage = *manual_end;
|
||||
m->begin = &m->tmp_storage;
|
||||
}
|
||||
m->in_progress = false; // not being processed anymore
|
||||
@@ -2018,14 +2050,14 @@ void DBImpl::CleanupCompaction(CompactionState* compact, Status status) {
|
||||
}
|
||||
|
||||
// Allocate the file numbers for the output file. We allocate as
|
||||
// many output file numbers as there are files in level+1.
|
||||
// many output file numbers as there are files in level+1 (at least one)
|
||||
// Insert them into pending_outputs so that they do not get deleted.
|
||||
void DBImpl::AllocateCompactionOutputFileNumbers(CompactionState* compact) {
|
||||
mutex_.AssertHeld();
|
||||
assert(compact != nullptr);
|
||||
assert(compact->builder == nullptr);
|
||||
int filesNeeded = compact->compaction->num_input_files(1);
|
||||
for (int i = 0; i < filesNeeded; i++) {
|
||||
for (int i = 0; i < std::max(filesNeeded, 1); i++) {
|
||||
uint64_t file_number = versions_->NewFileNumber();
|
||||
pending_outputs_.insert(file_number);
|
||||
compact->allocated_file_numbers.push_back(file_number);
|
||||
@@ -2169,14 +2201,11 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
|
||||
|
||||
// Add compaction outputs
|
||||
compact->compaction->AddInputDeletions(compact->compaction->edit());
|
||||
const int level = compact->compaction->level();
|
||||
for (size_t i = 0; i < compact->outputs.size(); i++) {
|
||||
const CompactionState::Output& out = compact->outputs[i];
|
||||
compact->compaction->edit()->AddFile(
|
||||
(options_.compaction_style == kCompactionStyleUniversal) ?
|
||||
level : level + 1,
|
||||
out.number, out.file_size, out.smallest, out.largest,
|
||||
out.smallest_seqno, out.largest_seqno);
|
||||
compact->compaction->output_level(), out.number, out.file_size,
|
||||
out.smallest, out.largest, out.smallest_seqno, out.largest_seqno);
|
||||
}
|
||||
return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
|
||||
}
|
||||
@@ -2218,14 +2247,14 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
|
||||
compact->compaction->num_input_files(0),
|
||||
compact->compaction->level(),
|
||||
compact->compaction->num_input_files(1),
|
||||
compact->compaction->level() + 1,
|
||||
compact->compaction->output_level(),
|
||||
compact->compaction->score(),
|
||||
options_.max_background_compactions - bg_compaction_scheduled_);
|
||||
char scratch[256];
|
||||
compact->compaction->Summary(scratch, sizeof(scratch));
|
||||
Log(options_.info_log, "Compaction start summary: %s\n", scratch);
|
||||
|
||||
assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
|
||||
assert(versions_->current()->NumLevelFiles(compact->compaction->level()) > 0);
|
||||
assert(compact->builder == nullptr);
|
||||
assert(!compact->outfile);
|
||||
|
||||
@@ -2553,9 +2582,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
|
||||
|
||||
CompactionStats stats;
|
||||
stats.micros = env_->NowMicros() - start_micros - imm_micros;
|
||||
if (options_.statistics.get()) {
|
||||
options_.statistics.get()->measureTime(COMPACTION_TIME, stats.micros);
|
||||
}
|
||||
MeasureTime(options_.statistics.get(), COMPACTION_TIME, stats.micros);
|
||||
stats.files_in_leveln = compact->compaction->num_input_files(0);
|
||||
stats.files_in_levelnp1 = compact->compaction->num_input_files(1);
|
||||
|
||||
@@ -2597,22 +2624,21 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
|
||||
status = InstallCompactionResults(compact);
|
||||
InstallSuperVersion(deletion_state);
|
||||
}
|
||||
VersionSet::LevelSummaryStorage tmp;
|
||||
Version::LevelSummaryStorage tmp;
|
||||
Log(options_.info_log,
|
||||
"compacted to: %s, %.1f MB/sec, level %d, files in(%d, %d) out(%d) "
|
||||
"MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
|
||||
"write-amplify(%.1f) %s\n",
|
||||
versions_->LevelSummary(&tmp),
|
||||
versions_->current()->LevelSummary(&tmp),
|
||||
(stats.bytes_readn + stats.bytes_readnp1 + stats.bytes_written) /
|
||||
(double) stats.micros,
|
||||
compact->compaction->output_level(),
|
||||
stats.files_in_leveln, stats.files_in_levelnp1, stats.files_out_levelnp1,
|
||||
stats.bytes_readn / 1048576.0,
|
||||
stats.bytes_readnp1 / 1048576.0,
|
||||
(double)stats.micros,
|
||||
compact->compaction->output_level(), stats.files_in_leveln,
|
||||
stats.files_in_levelnp1, stats.files_out_levelnp1,
|
||||
stats.bytes_readn / 1048576.0, stats.bytes_readnp1 / 1048576.0,
|
||||
stats.bytes_written / 1048576.0,
|
||||
(stats.bytes_written + stats.bytes_readnp1 + stats.bytes_readn) /
|
||||
(double) stats.bytes_readn,
|
||||
stats.bytes_written / (double) stats.bytes_readn,
|
||||
(double)stats.bytes_readn,
|
||||
stats.bytes_written / (double)stats.bytes_readn,
|
||||
status.ToString().c_str());
|
||||
|
||||
return status;
|
||||
@@ -2649,38 +2675,40 @@ static void CleanupIteratorState(void* arg1, void* arg2) {
|
||||
Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
|
||||
SequenceNumber* latest_snapshot) {
|
||||
IterState* cleanup = new IterState;
|
||||
mutex_.Lock();
|
||||
*latest_snapshot = versions_->LastSequence();
|
||||
MemTable* mutable_mem;
|
||||
std::vector<MemTable*> immutables;
|
||||
Version* version;
|
||||
|
||||
// Collect together all needed child iterators for mem
|
||||
std::vector<Iterator*> list;
|
||||
mutex_.Lock();
|
||||
*latest_snapshot = versions_->LastSequence();
|
||||
mem_->Ref();
|
||||
list.push_back(mem_->NewIterator(options));
|
||||
|
||||
cleanup->mem.push_back(mem_);
|
||||
|
||||
mutable_mem = mem_;
|
||||
// Collect together all needed child iterators for imm_
|
||||
std::vector<MemTable*> immutables;
|
||||
imm_.GetMemTables(&immutables);
|
||||
for (unsigned int i = 0; i < immutables.size(); i++) {
|
||||
MemTable* m = immutables[i];
|
||||
m->Ref();
|
||||
immutables[i]->Ref();
|
||||
}
|
||||
// Collect iterators for files in L0 - Ln
|
||||
versions_->current()->Ref();
|
||||
version = versions_->current();
|
||||
mutex_.Unlock();
|
||||
|
||||
std::vector<Iterator*> list;
|
||||
list.push_back(mutable_mem->NewIterator(options));
|
||||
cleanup->mem.push_back(mutable_mem);
|
||||
for (MemTable* m : immutables) {
|
||||
list.push_back(m->NewIterator(options));
|
||||
cleanup->mem.push_back(m);
|
||||
}
|
||||
|
||||
// Collect iterators for files in L0 - Ln
|
||||
versions_->current()->AddIterators(options, storage_options_, &list);
|
||||
version->AddIterators(options, storage_options_, &list);
|
||||
Iterator* internal_iter =
|
||||
NewMergingIterator(&internal_comparator_, &list[0], list.size());
|
||||
versions_->current()->Ref();
|
||||
|
||||
cleanup->version = version;
|
||||
cleanup->mu = &mutex_;
|
||||
cleanup->db = this;
|
||||
cleanup->version = versions_->current();
|
||||
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
|
||||
|
||||
mutex_.Unlock();
|
||||
return internal_iter;
|
||||
}
|
||||
|
||||
@@ -2691,7 +2719,7 @@ Iterator* DBImpl::TEST_NewInternalIterator() {
|
||||
|
||||
int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
|
||||
MutexLock l(&mutex_);
|
||||
return versions_->MaxNextLevelOverlappingBytes();
|
||||
return versions_->current()->MaxNextLevelOverlappingBytes();
|
||||
}
|
||||
|
||||
Status DBImpl::Get(const ReadOptions& options,
|
||||
@@ -2898,7 +2926,7 @@ std::vector<Status> DBImpl::MultiGet(
|
||||
Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
|
||||
const std::string& column_family_name,
|
||||
ColumnFamilyHandle* handle) {
|
||||
VersionEdit edit(0);
|
||||
VersionEdit edit;
|
||||
edit.AddColumnFamily(column_family_name);
|
||||
MutexLock l(&mutex_);
|
||||
++versions_->max_column_family_;
|
||||
@@ -2920,7 +2948,7 @@ Status DBImpl::DropColumnFamily(const ColumnFamilyHandle& column_family) {
|
||||
if (column_family.id == 0) {
|
||||
return Status::InvalidArgument("Can't drop default column family");
|
||||
}
|
||||
VersionEdit edit(0);
|
||||
VersionEdit edit;
|
||||
edit.DropColumnFamily();
|
||||
edit.SetColumnFamily(column_family.id);
|
||||
MutexLock l(&mutex_);
|
||||
@@ -3045,12 +3073,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
|
||||
uint64_t last_sequence = versions_->LastSequence();
|
||||
Writer* last_writer = &w;
|
||||
if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions
|
||||
// TODO: BuildBatchGroup physically concatenate/copy all write batches into
|
||||
// a new one. Mem copy is done with the lock held. Ideally, we only need
|
||||
// the lock to obtain the last_writer and the references to all batches.
|
||||
// Creation (copy) of the merged batch could have been done outside of the
|
||||
// lock protected region.
|
||||
WriteBatch* updates = BuildBatchGroup(&last_writer);
|
||||
autovector<WriteBatch*> write_batch_group;
|
||||
BuildBatchGroup(&last_writer, &write_batch_group);
|
||||
|
||||
// Add to log and apply to memtable. We can release the lock
|
||||
// during this phase since &w is currently responsible for logging
|
||||
@@ -3058,6 +3082,16 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
|
||||
// into mem_.
|
||||
{
|
||||
mutex_.Unlock();
|
||||
WriteBatch* updates = nullptr;
|
||||
if (write_batch_group.size() == 1) {
|
||||
updates = write_batch_group[0];
|
||||
} else {
|
||||
updates = &tmp_batch_;
|
||||
for (size_t i = 0; i < write_batch_group.size(); ++i) {
|
||||
WriteBatchInternal::Append(updates, write_batch_group[i]);
|
||||
}
|
||||
}
|
||||
|
||||
const SequenceNumber current_sequence = last_sequence + 1;
|
||||
WriteBatchInternal::SetSequence(updates, current_sequence);
|
||||
int my_batch_count = WriteBatchInternal::Count(updates);
|
||||
@@ -3100,15 +3134,15 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
|
||||
// have succeeded in memtable but Status reports error for all writes.
|
||||
throw std::runtime_error("In memory WriteBatch corruption!");
|
||||
}
|
||||
SetTickerCount(options_.statistics.get(),
|
||||
SEQUENCE_NUMBER, last_sequence);
|
||||
SetTickerCount(options_.statistics.get(), SEQUENCE_NUMBER,
|
||||
last_sequence);
|
||||
}
|
||||
if (updates == &tmp_batch_) tmp_batch_.Clear();
|
||||
mutex_.Lock();
|
||||
if (status.ok()) {
|
||||
versions_->SetLastSequence(last_sequence);
|
||||
}
|
||||
}
|
||||
if (updates == &tmp_batch_) tmp_batch_.Clear();
|
||||
}
|
||||
if (options_.paranoid_checks && !status.ok() && bg_error_.ok()) {
|
||||
bg_error_ = status; // stop compaction & fail any further writes
|
||||
@@ -3136,13 +3170,14 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
|
||||
|
||||
// REQUIRES: Writer list must be non-empty
|
||||
// REQUIRES: First writer must have a non-nullptr batch
|
||||
WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
|
||||
void DBImpl::BuildBatchGroup(Writer** last_writer,
|
||||
autovector<WriteBatch*>* write_batch_group) {
|
||||
assert(!writers_.empty());
|
||||
Writer* first = writers_.front();
|
||||
WriteBatch* result = first->batch;
|
||||
assert(result != nullptr);
|
||||
assert(first->batch != nullptr);
|
||||
|
||||
size_t size = WriteBatchInternal::ByteSize(first->batch);
|
||||
write_batch_group->push_back(first->batch);
|
||||
|
||||
// Allow the group to grow up to a maximum size, but if the
|
||||
// original write is small, limit the growth so we do not slow
|
||||
@@ -3175,18 +3210,10 @@ WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
|
||||
break;
|
||||
}
|
||||
|
||||
// Append to *reuslt
|
||||
if (result == first->batch) {
|
||||
// Switch to temporary batch instead of disturbing caller's batch
|
||||
result = &tmp_batch_;
|
||||
assert(WriteBatchInternal::Count(result) == 0);
|
||||
WriteBatchInternal::Append(result, first->batch);
|
||||
}
|
||||
WriteBatchInternal::Append(result, w->batch);
|
||||
write_batch_group->push_back(w->batch);
|
||||
}
|
||||
*last_writer = w;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
// This function computes the amount of time in microseconds by which a write
|
||||
@@ -3200,7 +3227,7 @@ WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
|
||||
// The goal of this formula is to gradually increase the rate at which writes
|
||||
// are slowed. We also tried linear delay (r * 1000), but it seemed to do
|
||||
// slightly worse. There is no other particular reason for choosing quadratic.
|
||||
uint64_t DBImpl::SlowdownAmount(int n, int top, int bottom) {
|
||||
uint64_t DBImpl::SlowdownAmount(int n, double bottom, double top) {
|
||||
uint64_t delay;
|
||||
if (n >= top) {
|
||||
delay = 1000;
|
||||
@@ -3212,10 +3239,10 @@ uint64_t DBImpl::SlowdownAmount(int n, int top, int bottom) {
|
||||
// If we are here, we know that:
|
||||
// level0_start_slowdown <= n < level0_slowdown
|
||||
// since the previous two conditions are false.
|
||||
float how_much =
|
||||
(float) (n - bottom) /
|
||||
double how_much =
|
||||
(double) (n - bottom) /
|
||||
(top - bottom);
|
||||
delay = how_much * how_much * 1000;
|
||||
delay = std::max(how_much * how_much * 1000, 100.0);
|
||||
}
|
||||
assert(delay <= 1000);
|
||||
return delay;
|
||||
@@ -3240,25 +3267,22 @@ Status DBImpl::MakeRoomForWrite(bool force,
|
||||
// Yield previous error
|
||||
s = bg_error_;
|
||||
break;
|
||||
} else if (
|
||||
allow_delay &&
|
||||
versions_->NumLevelFiles(0) >=
|
||||
options_.level0_slowdown_writes_trigger) {
|
||||
} else if (allow_delay && versions_->NeedSlowdownForNumLevel0Files()) {
|
||||
// We are getting close to hitting a hard limit on the number of
|
||||
// L0 files. Rather than delaying a single write by several
|
||||
// seconds when we hit the hard limit, start delaying each
|
||||
// individual write by 0-1ms to reduce latency variance. Also,
|
||||
// this delay hands over some CPU to the compaction thread in
|
||||
// case it is sharing the same core as the writer.
|
||||
uint64_t slowdown =
|
||||
SlowdownAmount(versions_->current()->NumLevelFiles(0),
|
||||
options_.level0_slowdown_writes_trigger,
|
||||
options_.level0_stop_writes_trigger);
|
||||
mutex_.Unlock();
|
||||
uint64_t delayed;
|
||||
{
|
||||
StopWatch sw(env_, options_.statistics.get(), STALL_L0_SLOWDOWN_COUNT);
|
||||
env_->SleepForMicroseconds(
|
||||
SlowdownAmount(versions_->NumLevelFiles(0),
|
||||
options_.level0_slowdown_writes_trigger,
|
||||
options_.level0_stop_writes_trigger)
|
||||
);
|
||||
env_->SleepForMicroseconds(slowdown);
|
||||
delayed = sw.ElapsedMicros();
|
||||
}
|
||||
RecordTick(options_.statistics.get(), STALL_L0_SLOWDOWN_MICROS, delayed);
|
||||
@@ -3290,7 +3314,7 @@ Status DBImpl::MakeRoomForWrite(bool force,
|
||||
STALL_MEMTABLE_COMPACTION_MICROS, stall);
|
||||
stall_memtable_compaction_ += stall;
|
||||
stall_memtable_compaction_count_++;
|
||||
} else if (versions_->NumLevelFiles(0) >=
|
||||
} else if (versions_->current()->NumLevelFiles(0) >=
|
||||
options_.level0_stop_writes_trigger) {
|
||||
// There are too many level-0 files.
|
||||
DelayLoggingAndReset();
|
||||
@@ -3366,17 +3390,13 @@ Status DBImpl::MakeRoomForWrite(bool force,
|
||||
EnvOptions soptions(storage_options_);
|
||||
soptions.use_mmap_writes = false;
|
||||
DelayLoggingAndReset();
|
||||
s = env_->NewWritableFile(
|
||||
LogFileName(options_.wal_dir, new_log_number),
|
||||
&lfile,
|
||||
soptions
|
||||
);
|
||||
s = env_->NewWritableFile(LogFileName(options_.wal_dir, new_log_number),
|
||||
&lfile, soptions);
|
||||
if (s.ok()) {
|
||||
// Our final size should be less than write_buffer_size
|
||||
// (compression, etc) but err on the side of caution.
|
||||
lfile->SetPreallocationBlockSize(1.1 * options_.write_buffer_size);
|
||||
memtmp = new MemTable(
|
||||
internal_comparator_, mem_rep_factory_, NumberLevels(), options_);
|
||||
memtmp = new MemTable(internal_comparator_, options_);
|
||||
new_superversion = new SuperVersion(options_.max_write_buffer_number);
|
||||
}
|
||||
}
|
||||
@@ -3426,6 +3446,7 @@ bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family,
|
||||
value->clear();
|
||||
|
||||
MutexLock l(&mutex_);
|
||||
Version* current = versions_->current();
|
||||
Slice in = property;
|
||||
Slice prefix("rocksdb.");
|
||||
if (!in.starts_with(prefix)) return false;
|
||||
@@ -3440,7 +3461,7 @@ bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family,
|
||||
} else {
|
||||
char buf[100];
|
||||
snprintf(buf, sizeof(buf), "%d",
|
||||
versions_->NumLevelFiles(static_cast<int>(level)));
|
||||
current->NumLevelFiles(static_cast<int>(level)));
|
||||
*value = buf;
|
||||
return true;
|
||||
}
|
||||
@@ -3455,8 +3476,8 @@ bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family,
|
||||
snprintf(buf, sizeof(buf),
|
||||
"%3d %8d %8.0f\n",
|
||||
level,
|
||||
versions_->NumLevelFiles(level),
|
||||
versions_->NumLevelBytes(level) / 1048576.0);
|
||||
current->NumLevelFiles(level),
|
||||
current->NumLevelBytes(level) / 1048576.0);
|
||||
value->append(buf);
|
||||
}
|
||||
return true;
|
||||
@@ -3499,8 +3520,8 @@ bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family,
|
||||
"--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------\n"
|
||||
);
|
||||
value->append(buf);
|
||||
for (int level = 0; level < NumberLevels(); level++) {
|
||||
int files = versions_->NumLevelFiles(level);
|
||||
for (int level = 0; level < current->NumberLevels(); level++) {
|
||||
int files = current->NumLevelFiles(level);
|
||||
if (stats_[level].micros > 0 || files > 0) {
|
||||
int64_t bytes_read = stats_[level].bytes_readn +
|
||||
stats_[level].bytes_readnp1;
|
||||
@@ -3521,8 +3542,8 @@ bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family,
|
||||
"%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f %10.1f %9.1f %11.1f %8d %8d %8d %8d %8d %9.1f %9lu\n",
|
||||
level,
|
||||
files,
|
||||
versions_->NumLevelBytes(level) / 1048576.0,
|
||||
versions_->NumLevelBytes(level) /
|
||||
current->NumLevelBytes(level) / 1048576.0,
|
||||
current->NumLevelBytes(level) /
|
||||
versions_->MaxBytesForLevel(level),
|
||||
stats_[level].micros / 1e6,
|
||||
bytes_read / 1048576.0,
|
||||
@@ -3758,7 +3779,7 @@ Status DBImpl::DeleteFile(std::string name) {
|
||||
int level;
|
||||
FileMetaData metadata;
|
||||
int maxlevel = NumberLevels();
|
||||
VersionEdit edit(maxlevel);
|
||||
VersionEdit edit;
|
||||
DeletionState deletion_state(0, true);
|
||||
{
|
||||
MutexLock l(&mutex_);
|
||||
@@ -3781,7 +3802,7 @@ Status DBImpl::DeleteFile(std::string name) {
|
||||
// This is to make sure that any deletion tombstones are not
|
||||
// lost. Check that the level passed is the last level.
|
||||
for (int i = level + 1; i < maxlevel; i++) {
|
||||
if (versions_->NumLevelFiles(i) != 0) {
|
||||
if (versions_->current()->NumLevelFiles(i) != 0) {
|
||||
Log(options_.info_log,
|
||||
"DeleteFile %s FAILED. File not in last level\n", name.c_str());
|
||||
return Status::InvalidArgument("File not in last level");
|
||||
@@ -3836,7 +3857,10 @@ Status DBImpl::GetDbIdentity(std::string& identity) {
|
||||
// can call if they wish
|
||||
Status DB::Put(const WriteOptions& opt, const ColumnFamilyHandle& column_family,
|
||||
const Slice& key, const Slice& value) {
|
||||
WriteBatch batch;
|
||||
// Pre-allocate size of write batch conservatively.
|
||||
// 8 bytes are taken by header, 4 bytes for count, 1 byte for type,
|
||||
// and we allocate 11 extra bytes for key length, as well as value length.
|
||||
WriteBatch batch(key.size() + value.size() + 24);
|
||||
batch.Put(column_family.id, key, value);
|
||||
return Write(opt, &batch);
|
||||
}
|
||||
@@ -3915,20 +3939,20 @@ Status DB::OpenWithColumnFamilies(
|
||||
return s;
|
||||
}
|
||||
impl->mutex_.Lock();
|
||||
VersionEdit edit(impl->NumberLevels());
|
||||
VersionEdit edit;
|
||||
// Handles create_if_missing, error_if_exists
|
||||
s = impl->Recover(&edit, column_families);
|
||||
if (s.ok()) {
|
||||
uint64_t new_log_number = impl->versions_->NewFileNumber();
|
||||
unique_ptr<WritableFile> lfile;
|
||||
soptions.use_mmap_writes = false;
|
||||
s = options.env->NewWritableFile(
|
||||
s = impl->options_.env->NewWritableFile(
|
||||
LogFileName(impl->options_.wal_dir, new_log_number),
|
||||
&lfile,
|
||||
soptions
|
||||
);
|
||||
if (s.ok()) {
|
||||
lfile->SetPreallocationBlockSize(1.1 * options.write_buffer_size);
|
||||
lfile->SetPreallocationBlockSize(1.1 * impl->options_.write_buffer_size);
|
||||
edit.SetLogNumber(new_log_number);
|
||||
impl->logfile_number_ = new_log_number;
|
||||
impl->log_.reset(new log::Writer(std::move(lfile)));
|
||||
@@ -3949,12 +3973,11 @@ Status DB::OpenWithColumnFamilies(
|
||||
impl->MaybeScheduleLogDBDeployStats();
|
||||
}
|
||||
}
|
||||
impl->mutex_.Unlock();
|
||||
|
||||
if (s.ok() && options.compaction_style == kCompactionStyleUniversal) {
|
||||
int num_files;
|
||||
if (s.ok() && impl->options_.compaction_style == kCompactionStyleUniversal) {
|
||||
Version* current = impl->versions_->current();
|
||||
for (int i = 1; i < impl->NumberLevels(); i++) {
|
||||
num_files = impl->versions_->NumLevelFiles(i);
|
||||
int num_files = current->NumLevelFiles(i);
|
||||
if (num_files > 0) {
|
||||
s = Status::InvalidArgument("Not all files are at level 0. Cannot "
|
||||
"open with universal compaction style.");
|
||||
@@ -3963,6 +3986,8 @@ Status DB::OpenWithColumnFamilies(
|
||||
}
|
||||
}
|
||||
|
||||
impl->mutex_.Unlock();
|
||||
|
||||
if (s.ok()) {
|
||||
*dbptr = impl;
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user