diff --git a/db/db_impl.cc b/db/db_impl.cc index 3b4e77d652..b765c0f4bc 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2270,20 +2270,26 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { StopWatch sw(env_, options_.statistics, DB_WRITE); MutexLock l(&mutex_); - writers_.push_back(&w); - while (!w.done && &w != writers_.front()) { - w.cv.Wait(); - } - if (w.done) { - return w.status; + + // If WAL is disabled, we avoid any queueing. + if (!options.disableWAL) { + writers_.push_back(&w); + while (!w.done && &w != writers_.front()) { + w.cv.Wait(); + } + if (w.done) { + return w.status; + } } // May temporarily unlock and wait. Status status = MakeRoomForWrite(my_batch == nullptr); uint64_t last_sequence = versions_->LastSequence(); Writer* last_writer = &w; + if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions - WriteBatch* updates = BuildBatchGroup(&last_writer); + WriteBatch* updates = options.disableWAL ? my_batch : + BuildBatchGroup(&last_writer); const SequenceNumber current_sequence = last_sequence + 1; WriteBatchInternal::SetSequence(updates, current_sequence); int my_batch_count = WriteBatchInternal::Count(updates); @@ -2298,12 +2304,12 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { // and protects against concurrent loggers and concurrent writes // into mem_. { - mutex_.Unlock(); if (options.disableWAL) { + // If WAL is disabled, then we do not drop the mutex. We keep the + // mutex to protect concurrent insertions into the memtable. flush_on_destroy_ = true; - } - - if (!options.disableWAL) { + } else { + mutex_.Unlock(); status = log_->AddRecord(WriteBatchInternal::Contents(updates)); if (status.ok() && options.sync) { if (options_.use_fsync) { @@ -2328,25 +2334,29 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { versions_->SetLastSequence(last_sequence); last_flushed_sequence_ = current_sequence; } - mutex_.Lock(); + if (!options.disableWAL) { + mutex_.Lock(); + } } if (updates == &tmp_batch_) tmp_batch_.Clear(); } - while (true) { - Writer* ready = writers_.front(); - writers_.pop_front(); - if (ready != &w) { - ready->status = status; - ready->done = true; - ready->cv.Signal(); + if (!options.disableWAL) { + while (true) { + Writer* ready = writers_.front(); + writers_.pop_front(); + if (ready != &w) { + ready->status = status; + ready->done = true; + ready->cv.Signal(); + } + if (ready == last_writer) break; } - if (ready == last_writer) break; - } - // Notify new head of write queue - if (!writers_.empty()) { - writers_.front()->cv.Signal(); + // Notify new head of write queue + if (!writers_.empty()) { + writers_.front()->cv.Signal(); + } } return status; } @@ -2410,7 +2420,6 @@ WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) { // REQUIRES: this thread is currently at the front of the writer queue Status DBImpl::MakeRoomForWrite(bool force) { mutex_.AssertHeld(); - assert(!writers_.empty()); bool allow_delay = !force; bool allow_rate_limit_delay = !force; uint64_t rate_limit_delay_millis = 0;