Merge branch 'master' into columnfamilies

Conflicts:
	HISTORY.md
	db/db_impl.cc
	db/db_impl.h
	db/db_iter.cc
	db/db_test.cc
	db/dbformat.h
	db/memtable.cc
	db/memtable_list.cc
	db/memtable_list.h
	db/table_cache.cc
	db/table_cache.h
	db/version_edit.h
	db/version_set.cc
	db/version_set.h
	db/write_batch.cc
	db/write_batch_test.cc
	include/rocksdb/options.h
	util/options.cc
This commit is contained in:
Igor Canadi
2014-02-06 15:42:16 -08:00
104 changed files with 6225 additions and 2358 deletions

View File

@@ -22,13 +22,13 @@
#include <vector>
#include "db/builder.h"
#include "db/dbformat.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
#include "db/filename.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable.h"
#include "db/memtablelist.h"
#include "db/memtable_list.h"
#include "db/merge_context.h"
#include "db/merge_helper.h"
#include "db/prefix_filter_iterator.h"
@@ -48,12 +48,13 @@
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "port/port.h"
#include "table/block.h"
#include "table/block_based_table_factory.h"
#include "table/merger.h"
#include "table/table_builder.h"
#include "table/two_level_iterator.h"
#include "util/auto_roll_logger.h"
#include "util/autovector.h"
#include "util/build_version.h"
#include "util/coding.h"
#include "util/hash_skiplist_rep.h"
@@ -61,13 +62,12 @@
#include "util/mutexlock.h"
#include "util/perf_context_imp.h"
#include "util/stop_watch.h"
#include "util/autovector.h"
namespace rocksdb {
const std::string default_column_family_name("default");
void dumpLeveldbBuildVersion(Logger * log);
void DumpLeveldbBuildVersion(Logger * log);
// Information kept for every waiting writer
struct DBImpl::Writer {
@@ -141,7 +141,10 @@ Options SanitizeOptions(const std::string& dbname,
DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
DBOptions result = src;
ClipToRange(&result.max_open_files, 20, 1000000);
// result.max_open_files means an "infinite" open files.
if (result.max_open_files != -1) {
ClipToRange(&result.max_open_files, 20, 1000000);
}
if (result.max_background_flushes == 0) {
result.max_background_flushes = 1;
}
@@ -210,10 +213,6 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
: env_(options.env),
dbname_(dbname),
options_(SanitizeOptions(dbname, options)),
// Reserve ten files or so for other uses and give the rest to TableCache.
table_cache_(NewLRUCache(options_.max_open_files - 10,
options_.table_cache_numshardbits,
options_.table_cache_remove_scan_count_limit)),
db_lock_(nullptr),
mutex_(options.use_adaptive_mutex),
shutting_down_(nullptr),
@@ -239,18 +238,27 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
env_->GetAbsolutePath(dbname, &db_absolute_path_);
// Reserve ten files or so for other uses and give the rest to TableCache.
// Give a large number for setting of "infinite" open files.
const int table_cache_size =
(options_.max_open_files == -1) ? 4194304 : options_.max_open_files - 10;
// Reserve ten files or so for other uses and give the rest to TableCache.
table_cache_ =
NewLRUCache(table_cache_size, options_.table_cache_numshardbits,
options_.table_cache_remove_scan_count_limit);
versions_.reset(
new VersionSet(dbname_, &options_, storage_options_, table_cache_.get()));
column_family_memtables_.reset(
new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
dumpLeveldbBuildVersion(options_.info_log.get());
DumpLeveldbBuildVersion(options_.info_log.get());
// TODO(icanadi) dump DBOptions and ColumnFamilyOptions separately
// options_.Dump(options_.info_log.get());
char name[100];
Status st = env_->GetHostName(name, 100L);
if (st.ok()) {
Status s = env_->GetHostName(name, 100L);
if (s.ok()) {
host_name_ = name;
} else {
Log(options_.info_log, "Can't get hostname, use localhost as host name.");
@@ -283,6 +291,10 @@ DBImpl::~DBImpl() {
env_->UnlockFile(db_lock_);
}
// versions need to be destroyed before table_cache since it can hold
// references to table_cache.
versions_.reset();
LogFlush(options_.info_log);
}
@@ -396,7 +408,7 @@ void DBImpl::MaybeDumpStats() {
}
// Returns the list of live files in 'sst_live' and the list
// of all files in the filesystem in 'all_files'.
// of all files in the filesystem in 'candidate_files'.
// no_full_scan = true -- never do the full scan using GetChildren()
// force = false -- don't force the full scan, except every
// options_.delete_obsolete_files_period_micros
@@ -448,15 +460,18 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state,
versions_->AddLiveFiles(&deletion_state.sst_live);
if (doing_the_full_scan) {
// set of all files in the directory
env_->GetChildren(dbname_, &deletion_state.all_files); // Ignore errors
// set of all files in the directory. We'll exclude files that are still
// alive in the subsequent processings.
env_->GetChildren(
dbname_, &deletion_state.candidate_files
); // Ignore errors
//Add log files in wal_dir
if (options_.wal_dir != dbname_) {
std::vector<std::string> log_files;
env_->GetChildren(options_.wal_dir, &log_files); // Ignore errors
deletion_state.all_files.insert(
deletion_state.all_files.end(),
deletion_state.candidate_files.insert(
deletion_state.candidate_files.end(),
log_files.begin(),
log_files.end()
);
@@ -469,11 +484,10 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state,
// files in sst_delete_files and log_delete_files.
// It is not necessary to hold the mutex when invoking this method.
void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
// check if there is anything to do
if (!state.all_files.size() &&
!state.sst_delete_files.size() &&
!state.log_delete_files.size()) {
if (state.candidate_files.empty() &&
state.sst_delete_files.empty() &&
state.log_delete_files.empty()) {
return;
}
@@ -483,100 +497,114 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
if (state.manifest_file_number == 0) {
return;
}
uint64_t number;
FileType type;
std::vector<std::string> old_log_files;
// Now, convert live list to an unordered set, WITHOUT mutex held;
// set is slow.
std::unordered_set<uint64_t> live_set(state.sst_live.begin(),
state.sst_live.end());
std::unordered_set<uint64_t> sst_live(
state.sst_live.begin(), state.sst_live.end()
);
state.all_files.reserve(state.all_files.size() +
state.sst_delete_files.size());
auto& candidate_files = state.candidate_files;
candidate_files.reserve(
candidate_files.size() +
state.sst_delete_files.size() +
state.log_delete_files.size());
// We may ignore the dbname when generating the file names.
const char* kDumbDbName = "";
for (auto file : state.sst_delete_files) {
state.all_files.push_back(TableFileName("", file->number).substr(1));
candidate_files.push_back(
TableFileName(kDumbDbName, file->number).substr(1)
);
delete file;
}
state.all_files.reserve(state.all_files.size() +
state.log_delete_files.size());
for (auto filenum : state.log_delete_files) {
if (filenum > 0) {
state.all_files.push_back(LogFileName("", filenum).substr(1));
for (auto file_num : state.log_delete_files) {
if (file_num > 0) {
candidate_files.push_back(
LogFileName(kDumbDbName, file_num).substr(1)
);
}
}
// dedup state.all_files so we don't try to delete the same
// dedup state.candidate_files so we don't try to delete the same
// file twice
sort(state.all_files.begin(), state.all_files.end());
auto unique_end = unique(state.all_files.begin(), state.all_files.end());
sort(candidate_files.begin(), candidate_files.end());
candidate_files.erase(
unique(candidate_files.begin(), candidate_files.end()),
candidate_files.end()
);
for (size_t i = 0; state.all_files.begin() + i < unique_end; i++) {
if (ParseFileName(state.all_files[i], &number, &type)) {
bool keep = true;
switch (type) {
case kLogFile:
keep = ((number >= state.log_number) ||
(number == state.prev_log_number));
break;
case kDescriptorFile:
// Keep my manifest file, and any newer incarnations'
// (in case there is a race that allows other incarnations)
keep = (number >= state.manifest_file_number);
break;
case kTableFile:
keep = (live_set.find(number) != live_set.end());
break;
case kTempFile:
// Any temp files that are currently being written to must
// be recorded in pending_outputs_, which is inserted into "live"
keep = (live_set.find(number) != live_set.end());
break;
case kInfoLogFile:
keep = true;
if (number != 0) {
old_log_files.push_back(state.all_files[i]);
}
break;
case kCurrentFile:
case kDBLockFile:
case kIdentityFile:
case kMetaDatabase:
keep = true;
break;
}
for (const auto& to_delete : candidate_files) {
uint64_t number;
FileType type;
// Ignore file if we cannot recognize it.
if (!ParseFileName(to_delete, &number, &type)) {
continue;
}
if (!keep) {
if (type == kTableFile) {
// evict from cache
TableCache::Evict(table_cache_.get(), number);
bool keep = true;
switch (type) {
case kLogFile:
keep = ((number >= state.log_number) ||
(number == state.prev_log_number));
break;
case kDescriptorFile:
// Keep my manifest file, and any newer incarnations'
// (in case there is a race that allows other incarnations)
keep = (number >= state.manifest_file_number);
break;
case kTableFile:
keep = (sst_live.find(number) != sst_live.end());
break;
case kTempFile:
// Any temp files that are currently being written to must
// be recorded in pending_outputs_, which is inserted into "live"
keep = (sst_live.find(number) != sst_live.end());
break;
case kInfoLogFile:
keep = true;
if (number != 0) {
old_log_files.push_back(to_delete);
}
std::string fname = ((type == kLogFile) ? options_.wal_dir : dbname_) +
"/" + state.all_files[i];
break;
case kCurrentFile:
case kDBLockFile:
case kIdentityFile:
case kMetaDatabase:
keep = true;
break;
}
if (keep) {
continue;
}
if (type == kTableFile) {
// evict from cache
TableCache::Evict(table_cache_.get(), number);
}
std::string fname = ((type == kLogFile) ? options_.wal_dir : dbname_) +
"/" + to_delete;
Log(options_.info_log,
"Delete type=%d #%lu",
int(type),
(unsigned long)number);
if (type == kLogFile &&
(options_.WAL_ttl_seconds > 0 || options_.WAL_size_limit_MB > 0)) {
Status s = env_->RenameFile(fname,
ArchivedLogFileName(options_.wal_dir, number));
if (!s.ok()) {
Log(options_.info_log,
"Delete type=%d #%lu",
int(type),
(unsigned long)number);
Status st;
if (type == kLogFile && (options_.WAL_ttl_seconds > 0 ||
options_.WAL_size_limit_MB > 0)) {
st = env_->RenameFile(fname,
ArchivedLogFileName(options_.wal_dir, number));
if (!st.ok()) {
Log(options_.info_log,
"RenameFile logfile #%lu FAILED -- %s\n",
(unsigned long)number, st.ToString().c_str());
}
} else {
st = env_->DeleteFile(fname);
if (!st.ok()) {
Log(options_.info_log, "Delete type=%d #%lu FAILED -- %s\n",
int(type), (unsigned long)number, st.ToString().c_str());
}
}
"RenameFile logfile #%lu FAILED -- %s\n",
(unsigned long)number, s.ToString().c_str());
}
} else {
Status s = env_->DeleteFile(fname);
if (!s.ok()) {
Log(options_.info_log, "Delete type=%d #%lu FAILED -- %s\n",
int(type), (unsigned long)number, s.ToString().c_str());
}
}
}
@@ -805,10 +833,11 @@ Status DBImpl::Recover(
if (!s.ok()) {
return s;
}
uint64_t number;
FileType type;
std::vector<uint64_t> logs;
for (size_t i = 0; i < filenames.size(); i++) {
uint64_t number;
FileType type;
if (ParseFileName(filenames[i], &number, &type)
&& type == kLogFile
&& ((number >= min_log) || (number == prev_log))) {
@@ -824,12 +853,12 @@ Status DBImpl::Recover(
// Recover in the order in which the logs were generated
std::sort(logs.begin(), logs.end());
for (size_t i = 0; s.ok() && i < logs.size(); i++) {
for (const auto& log : logs) {
// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(logs[i]);
s = RecoverLogFile(logs[i], &max_sequence, read_only);
versions_->MarkFileNumberUsed(log);
s = RecoverLogFile(log, &max_sequence, read_only);
}
if (s.ok()) {
@@ -1011,7 +1040,7 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
{
mutex_.Unlock();
s = BuildTable(dbname_, env_, *cfd->full_options(), storage_options_,
cfd->table_cache(), iter, &meta, cfd->user_comparator(),
cfd->table_cache(), iter, &meta, cfd->internal_comparator(),
newest_snapshot, earliest_seqno_in_memtable,
GetCompressionFlush(*cfd->full_options()));
LogFlush(options_.info_log);
@@ -1045,7 +1074,7 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
}
Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
std::vector<MemTable*>& mems, VersionEdit* edit,
autovector<MemTable*>& mems, VersionEdit* edit,
uint64_t* filenumber) {
mutex_.AssertHeld();
const uint64_t start_micros = env_->NowMicros();
@@ -1062,21 +1091,20 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
Status s;
{
mutex_.Unlock();
std::vector<Iterator*> list;
std::vector<Iterator*> memtables;
for (MemTable* m : mems) {
Log(options_.info_log,
"Flushing memtable with log file: %lu\n",
(unsigned long)m->GetLogNumber());
list.push_back(m->NewIterator());
memtables.push_back(m->NewIterator());
}
Iterator* iter =
NewMergingIterator(&cfd->internal_comparator(), &list[0], list.size());
Log(options_.info_log,
"Level-0 flush table #%lu: started",
Iterator* iter = NewMergingIterator(env_, &cfd->internal_comparator(),
&memtables[0], memtables.size());
Log(options_.info_log, "Level-0 flush table #%lu: started",
(unsigned long)meta.number);
s = BuildTable(dbname_, env_, *cfd->full_options(), storage_options_,
cfd->table_cache(), iter, &meta, cfd->user_comparator(),
cfd->table_cache(), iter, &meta, cfd->internal_comparator(),
newest_snapshot, earliest_seqno_in_memtable,
GetCompressionFlush(*cfd->full_options()));
LogFlush(options_.info_log);
@@ -1092,7 +1120,6 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
}
base->Unref();
// re-acquire the most current version
base = cfd->current();
@@ -1145,7 +1172,7 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd,
// Save the contents of the earliest memtable as a new Table
uint64_t file_number;
std::vector<MemTable*> mems;
autovector<MemTable*> mems;
cfd->imm()->PickMemtablesToFlush(&mems);
if (mems.empty()) {
Log(options_.info_log, "Nothing in memstore to flush");
@@ -1763,8 +1790,7 @@ Status DBImpl::BackgroundFlush(bool* madeProgress,
void DBImpl::BackgroundCallFlush() {
bool madeProgress = false;
DeletionState deletion_state(default_cfd_->options()->max_write_buffer_number,
true);
DeletionState deletion_state(true);
assert(bg_flush_scheduled_);
MutexLock l(&mutex_);
@@ -1815,8 +1841,7 @@ uint64_t DBImpl::TEST_GetLevel0TotalSize() {
void DBImpl::BackgroundCallCompaction() {
bool madeProgress = false;
DeletionState deletion_state(default_cfd_->options()->max_write_buffer_number,
true);
DeletionState deletion_state(true);
MaybeDumpStats();
@@ -2077,8 +2102,9 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
*cfd->full_options(), compact->compaction->output_level(),
compact->compaction->enable_compression());
compact->builder.reset(GetTableBuilder(
*cfd->full_options(), compact->outfile.get(), compression_type));
compact->builder.reset(
NewTableBuilder(*cfd->full_options(), cfd->internal_comparator(),
compact->outfile.get(), compression_type));
}
LogFlush(options_.info_log);
return s;
@@ -2126,8 +2152,9 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
if (s.ok() && current_entries > 0) {
// Verify that the table is usable
ColumnFamilyData* cfd = compact->compaction->column_family_data();
FileMetaData meta(output_number, current_bytes);
Iterator* iter = cfd->table_cache()->NewIterator(
ReadOptions(), storage_options_, output_number, current_bytes);
ReadOptions(), storage_options_, cfd->internal_comparator(), meta);
s = iter->status();
delete iter;
if (s.ok()) {
@@ -2641,8 +2668,9 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
// Collect iterators for files in L0 - Ln
super_version->current->AddIterators(options, storage_options_,
&iterator_list);
Iterator* internal_iter = NewMergingIterator(
&cfd->internal_comparator(), &iterator_list[0], iterator_list.size());
Iterator* internal_iter =
NewMergingIterator(env_, &cfd->internal_comparator(), &iterator_list[0],
iterator_list.size());
IterState* cleanup = new IterState(this, &mutex_, super_version);
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
@@ -2677,8 +2705,8 @@ std::pair<Iterator*, Iterator*> DBImpl::GetTailingIteratorPair(
std::vector<Iterator*> list;
super_version->imm->AddIterators(options, &list);
super_version->current->AddIterators(options, storage_options_, &list);
Iterator* immutable_iter =
NewMergingIterator(&cfd->internal_comparator(), &list[0], list.size());
Iterator* immutable_iter = NewMergingIterator(
env_, &cfd->internal_comparator(), &list[0], list.size());
// create a DBIter that only uses memtable content; see NewIterator()
immutable_iter =
@@ -2739,6 +2767,8 @@ Status DBImpl::GetImpl(const ReadOptions& options,
const Slice& key, std::string* value,
bool* value_found) {
StopWatch sw(env_, options_.statistics.get(), DB_GET, false);
StopWatchNano snapshot_timer(env_, false);
StartPerfTimer(&snapshot_timer);
mutex_.Lock();
auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id);
@@ -2766,6 +2796,7 @@ Status DBImpl::GetImpl(const ReadOptions& options,
// s is both in/out. When in, s could either be OK or MergeInProgress.
// merge_operands will contain the sequence of merges in the latter case.
LookupKey lkey(key, snapshot);
BumpPerfTime(&perf_context.get_snapshot_time, &snapshot_timer);
if (get_version->mem->Get(lkey, value, &s, merge_context,
*cfd->full_options())) {
// Done
@@ -2775,12 +2806,19 @@ Status DBImpl::GetImpl(const ReadOptions& options,
// Done
RecordTick(options_.statistics.get(), MEMTABLE_HIT);
} else {
StopWatchNano from_files_timer(env_, false);
StartPerfTimer(&from_files_timer);
get_version->current->Get(options, lkey, value, &s, &merge_context, &stats,
*cfd->full_options(), value_found);
have_stat_update = true;
BumpPerfTime(&perf_context.get_from_output_files_time, &from_files_timer);
RecordTick(options_.statistics.get(), MEMTABLE_MISS);
}
StopWatchNano post_process_timer(env_, false);
StartPerfTimer(&post_process_timer);
bool delete_get_version = false;
if (!cfd->options()->disable_seek_compaction && have_stat_update) {
mutex_.Lock();
@@ -2805,8 +2843,10 @@ Status DBImpl::GetImpl(const ReadOptions& options,
}
// Note, tickers are atomic now - no lock protection needed any more.
RecordTick(options_.statistics.get(), NUMBER_KEYS_READ);
RecordTick(options_.statistics.get(), BYTES_READ, value->size());
BumpPerfTime(&perf_context.get_post_process_time, &post_process_timer);
return s;
}
@@ -2816,6 +2856,9 @@ std::vector<Status> DBImpl::MultiGet(
const std::vector<Slice>& keys, std::vector<std::string>* values) {
StopWatch sw(env_, options_.statistics.get(), DB_MULTIGET, false);
StopWatchNano snapshot_timer(env_, false);
StartPerfTimer(&snapshot_timer);
SequenceNumber snapshot;
struct MultiGetColumnFamilyData {
@@ -2856,6 +2899,7 @@ std::vector<Status> DBImpl::MultiGet(
// Keep track of bytes that we read for statistics-recording later
uint64_t bytes_read = 0;
BumpPerfTime(&perf_context.get_snapshot_time, &snapshot_timer);
// For each of the given keys, apply the entire "get" process as follows:
// First look in the memtable, then in the immutable memtable (if any).
@@ -2889,6 +2933,9 @@ std::vector<Status> DBImpl::MultiGet(
}
}
// Post processing (decrement reference counts and record statistics)
StopWatchNano post_process_timer(env_, false);
StartPerfTimer(&post_process_timer);
autovector<SuperVersion*> superversions_to_delete;
bool schedule_flush_or_compaction = false;
@@ -2921,6 +2968,7 @@ std::vector<Status> DBImpl::MultiGet(
RecordTick(options_.statistics.get(), NUMBER_MULTIGET_CALLS);
RecordTick(options_.statistics.get(), NUMBER_MULTIGET_KEYS_READ, num_keys);
RecordTick(options_.statistics.get(), NUMBER_MULTIGET_BYTES_READ, bytes_read);
BumpPerfTime(&perf_context.get_post_process_time, &post_process_timer);
return stat_list;
}
@@ -3080,6 +3128,8 @@ Status DBImpl::Delete(const WriteOptions& options,
}
Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
StopWatchNano pre_post_process_timer(env_, false);
StartPerfTimer(&pre_post_process_timer);
Writer w(&mutex_);
w.batch = my_batch;
w.sync = options.sync;
@@ -3148,6 +3198,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
if (options.disableWAL) {
flush_on_destroy_ = true;
}
BumpPerfTime(&perf_context.write_pre_and_post_process_time,
&pre_post_process_timer);
if (!options.disableWAL) {
StopWatchNano timer(env_);
@@ -3156,7 +3208,6 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
status = log_->AddRecord(log_entry);
RecordTick(options_.statistics.get(), WAL_FILE_SYNCED, 1);
RecordTick(options_.statistics.get(), WAL_FILE_BYTES, log_entry.size());
BumpPerfTime(&perf_context.wal_write_time, &timer);
if (status.ok() && options.sync) {
if (options_.use_fsync) {
StopWatch(env_, options_.statistics.get(), WAL_FILE_SYNC_MICROS);
@@ -3166,12 +3217,17 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
status = log_->file()->Sync();
}
}
BumpPerfTime(&perf_context.write_wal_time, &timer);
}
if (status.ok()) {
StopWatchNano write_memtable_timer(env_, false);
// reading the column family set outside of DB mutex -- should lock
versions_->GetColumnFamilySet()->Lock();
StartPerfTimer(&write_memtable_timer);
status = WriteBatchInternal::InsertInto(
updates, column_family_memtables_.get(), 0, this, false);
BumpPerfTime(&perf_context.write_memtable_time, &write_memtable_timer);
versions_->GetColumnFamilySet()->Unlock();
if (!status.ok()) {
@@ -3184,6 +3240,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
SetTickerCount(options_.statistics.get(), SEQUENCE_NUMBER,
last_sequence);
}
StartPerfTimer(&pre_post_process_timer);
if (updates == &tmp_batch_) tmp_batch_.Clear();
mutex_.Lock();
if (status.ok()) {
@@ -3211,6 +3268,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
writers_.front()->cv.Signal();
}
mutex_.Unlock();
BumpPerfTime(&perf_context.write_pre_and_post_process_time,
&pre_post_process_timer);
return status;
}
@@ -3420,7 +3479,7 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) {
} else {
unique_ptr<WritableFile> lfile;
MemTable* memtmp = nullptr;
MemTable* new_mem = nullptr;
// Attempt to switch to a new memtable and trigger compaction of old.
// Do this without holding the dbmutex lock.
@@ -3439,7 +3498,7 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) {
// (compression, etc) but err on the side of caution.
lfile->SetPreallocationBlockSize(1.1 *
cfd->options()->write_buffer_size);
memtmp = new MemTable(cfd->internal_comparator(), *cfd->options());
new_mem = new MemTable(cfd->internal_comparator(), *cfd->options());
new_superversion = new SuperVersion();
}
}
@@ -3447,7 +3506,7 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) {
if (!s.ok()) {
// Avoid chewing through file number space in a tight loop.
versions_->ReuseFileNumber(new_log_number);
assert (!memtmp);
assert (!new_mem);
break;
}
logfile_number_ = new_log_number;
@@ -3457,12 +3516,12 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) {
if (force) {
cfd->imm()->FlushRequested();
}
memtmp->Ref();
memtmp->SetLogNumber(logfile_number_);
cfd->SetMemtable(memtmp);
new_mem->Ref();
new_mem->SetLogNumber(logfile_number_);
cfd->SetMemtable(new_mem);
Log(options_.info_log, "New memtable created with log file: #%lu\n",
(unsigned long)logfile_number_);
force = false; // Do not force another compaction if have room
force = false; // Do not force another compaction if have room
MaybeScheduleFlushOrCompaction();
delete cfd->InstallSuperVersion(new_superversion);
}
@@ -3552,10 +3611,10 @@ Status DBImpl::DeleteFile(std::string name) {
}
int level;
FileMetaData metadata;
FileMetaData *metadata;
ColumnFamilyData* cfd;
VersionEdit edit;
DeletionState deletion_state(0, true);
DeletionState deletion_state(true);
{
MutexLock l(&mutex_);
status = versions_->GetMetadataForFile(number, &level, &metadata, &cfd);
@@ -3567,7 +3626,7 @@ Status DBImpl::DeleteFile(std::string name) {
assert((level > 0) && (level < cfd->NumberLevels()));
// If the file is being compacted no need to delete.
if (metadata.being_compacted) {
if (metadata->being_compacted) {
Log(options_.info_log,
"DeleteFile %s Skipped. File about to be compacted\n", name.c_str());
return Status::OK();
@@ -3866,7 +3925,7 @@ Status DestroyDB(const std::string& dbname, const Options& options) {
//
// A global method that can dump out the build version
void dumpLeveldbBuildVersion(Logger * log) {
void DumpLeveldbBuildVersion(Logger * log) {
Log(log, "Git sha %s", rocksdb_build_git_sha);
Log(log, "Compile time %s %s",
rocksdb_build_compile_time, rocksdb_build_compile_date);