mirror of
https://github.com/XRPLF/rippled.git
synced 2025-12-06 17:27:55 +00:00
Fix a number of object lifetime/ownership issues
Summary: Replace manual memory management with std::unique_ptr in a number of places; not exhaustive, but this fixes a few leaks with file handles as well as clarifies semantics of the ownership of file handles with log classes. Test Plan: db_stress, make check Reviewers: dhruba Reviewed By: dhruba CC: zshao, leveldb, heyongqiang Differential Revision: https://reviews.facebook.net/D8043
This commit is contained in:
141
db/db_impl.cc
141
db/db_impl.cc
@@ -48,18 +48,17 @@ static Status NewLogger(const std::string& dbname,
|
||||
const std::string& db_log_dir,
|
||||
Env* env,
|
||||
size_t max_log_file_size,
|
||||
Logger** logger) {
|
||||
shared_ptr<Logger>* logger) {
|
||||
std::string db_absolute_path;
|
||||
env->GetAbsolutePath(dbname, &db_absolute_path);
|
||||
|
||||
if (max_log_file_size > 0) { // need to auto split the log file?
|
||||
AutoSplitLogger<Logger>* auto_split_logger =
|
||||
auto logger_ptr =
|
||||
new AutoSplitLogger<Logger>(env, dbname, db_log_dir, max_log_file_size);
|
||||
Status s = auto_split_logger->GetStatus();
|
||||
logger->reset(logger_ptr);
|
||||
Status s = logger_ptr->GetStatus();
|
||||
if (!s.ok()) {
|
||||
delete auto_split_logger;
|
||||
} else {
|
||||
*logger = auto_split_logger;
|
||||
logger->reset();
|
||||
}
|
||||
return s;
|
||||
} else {
|
||||
@@ -103,8 +102,8 @@ struct DBImpl::CompactionState {
|
||||
std::list<uint64_t> allocated_file_numbers;
|
||||
|
||||
// State kept for output being generated
|
||||
WritableFile* outfile;
|
||||
TableBuilder* builder;
|
||||
unique_ptr<WritableFile> outfile;
|
||||
unique_ptr<TableBuilder> builder;
|
||||
|
||||
uint64_t total_bytes;
|
||||
|
||||
@@ -112,8 +111,6 @@ struct DBImpl::CompactionState {
|
||||
|
||||
explicit CompactionState(Compaction* c)
|
||||
: compaction(c),
|
||||
outfile(NULL),
|
||||
builder(NULL),
|
||||
total_bytes(0) {
|
||||
}
|
||||
};
|
||||
@@ -178,14 +175,11 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
|
||||
dbname, &internal_comparator_, &internal_filter_policy_, options)),
|
||||
internal_filter_policy_(options.filter_policy),
|
||||
owns_info_log_(options_.info_log != options.info_log),
|
||||
owns_cache_(options_.block_cache != options.block_cache),
|
||||
db_lock_(NULL),
|
||||
shutting_down_(NULL),
|
||||
bg_cv_(&mutex_),
|
||||
mem_(new MemTable(internal_comparator_, NumberLevels())),
|
||||
logfile_(NULL),
|
||||
logfile_number_(0),
|
||||
log_(NULL),
|
||||
tmp_batch_(new WriteBatch),
|
||||
bg_compaction_scheduled_(0),
|
||||
bg_logstats_scheduled_(false),
|
||||
@@ -206,13 +200,13 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
|
||||
stats_ = new CompactionStats[options.num_levels];
|
||||
// Reserve ten files or so for other uses and give the rest to TableCache.
|
||||
const int table_cache_size = options_.max_open_files - 10;
|
||||
table_cache_ = new TableCache(dbname_, &options_, table_cache_size);
|
||||
table_cache_.reset(new TableCache(dbname_, &options_, table_cache_size));
|
||||
|
||||
versions_ = new VersionSet(dbname_, &options_, table_cache_,
|
||||
&internal_comparator_);
|
||||
versions_.reset(new VersionSet(dbname_, &options_, table_cache_.get(),
|
||||
&internal_comparator_));
|
||||
|
||||
dumpLeveldbBuildVersion(options_.info_log);
|
||||
options_.Dump(options_.info_log);
|
||||
dumpLeveldbBuildVersion(options_.info_log.get());
|
||||
options_.Dump(options_.info_log.get());
|
||||
|
||||
#ifdef USE_SCRIBE
|
||||
logger_ = new ScribeLogger("localhost", 1456);
|
||||
@@ -246,21 +240,11 @@ DBImpl::~DBImpl() {
|
||||
env_->UnlockFile(db_lock_);
|
||||
}
|
||||
|
||||
delete versions_;
|
||||
if (mem_ != NULL) mem_->Unref();
|
||||
imm_.UnrefAll();
|
||||
delete tmp_batch_;
|
||||
delete log_;
|
||||
delete logfile_;
|
||||
delete table_cache_;
|
||||
delete[] stats_;
|
||||
|
||||
if (owns_info_log_) {
|
||||
delete options_.info_log;
|
||||
}
|
||||
if (owns_cache_) {
|
||||
delete options_.block_cache;
|
||||
}
|
||||
if (options_.compression_per_level != NULL) {
|
||||
delete[] options_.compression_per_level;
|
||||
}
|
||||
@@ -288,6 +272,10 @@ void DBImpl::TEST_Destroy_DBImpl() {
|
||||
if (db_lock_ != NULL) {
|
||||
env_->UnlockFile(db_lock_);
|
||||
}
|
||||
|
||||
log_.reset();
|
||||
versions_.reset();
|
||||
table_cache_.reset();
|
||||
}
|
||||
|
||||
uint64_t DBImpl::TEST_Current_Manifest_FileNo() {
|
||||
@@ -302,21 +290,17 @@ Status DBImpl::NewDB() {
|
||||
new_db.SetLastSequence(0);
|
||||
|
||||
const std::string manifest = DescriptorFileName(dbname_, 1);
|
||||
WritableFile* file;
|
||||
unique_ptr<WritableFile> file;
|
||||
Status s = env_->NewWritableFile(manifest, &file);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
{
|
||||
log::Writer log(file);
|
||||
log::Writer log(std::move(file));
|
||||
std::string record;
|
||||
new_db.EncodeTo(&record);
|
||||
s = log.AddRecord(record);
|
||||
if (s.ok()) {
|
||||
s = file->Close();
|
||||
}
|
||||
}
|
||||
delete file;
|
||||
if (s.ok()) {
|
||||
// Make "CURRENT" file that points to the new manifest file.
|
||||
s = SetCurrentFile(env_, dbname_, 1);
|
||||
@@ -628,7 +612,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
|
||||
|
||||
// Open the log file
|
||||
std::string fname = LogFileName(dbname_, log_number);
|
||||
SequentialFile* file;
|
||||
unique_ptr<SequentialFile> file;
|
||||
Status status = env_->NewSequentialFile(fname, &file);
|
||||
if (!status.ok()) {
|
||||
MaybeIgnoreError(&status);
|
||||
@@ -638,14 +622,14 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
|
||||
// Create the log reader.
|
||||
LogReporter reporter;
|
||||
reporter.env = env_;
|
||||
reporter.info_log = options_.info_log;
|
||||
reporter.info_log = options_.info_log.get();
|
||||
reporter.fname = fname.c_str();
|
||||
reporter.status = (options_.paranoid_checks ? &status : NULL);
|
||||
// We intentially make log::Reader do checksumming even if
|
||||
// paranoid_checks==false so that corruptions cause entire commits
|
||||
// to be skipped instead of propagating bad information (like overly
|
||||
// large sequence numbers).
|
||||
log::Reader reader(file, &reporter, true/*checksum*/,
|
||||
log::Reader reader(std::move(file), &reporter, true/*checksum*/,
|
||||
0/*initial_offset*/);
|
||||
Log(options_.info_log, "Recovering log #%llu",
|
||||
(unsigned long long) log_number);
|
||||
@@ -703,7 +687,6 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
|
||||
}
|
||||
|
||||
if (mem != NULL && !external_table) mem->Unref();
|
||||
delete file;
|
||||
return status;
|
||||
}
|
||||
|
||||
@@ -720,7 +703,7 @@ Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) {
|
||||
Status s;
|
||||
{
|
||||
mutex_.Unlock();
|
||||
s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
|
||||
s = BuildTable(dbname_, env_, options_, table_cache_.get(), iter, &meta);
|
||||
mutex_.Lock();
|
||||
}
|
||||
|
||||
@@ -766,7 +749,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
|
||||
Status s;
|
||||
{
|
||||
mutex_.Unlock();
|
||||
s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
|
||||
s = BuildTable(dbname_, env_, options_, table_cache_.get(), iter, &meta);
|
||||
mutex_.Lock();
|
||||
}
|
||||
base->Unref();
|
||||
@@ -845,8 +828,9 @@ Status DBImpl::CompactMemTable(bool* madeProgress) {
|
||||
}
|
||||
|
||||
// Replace immutable memtable with the generated Table
|
||||
s = imm_.InstallMemtableFlushResults(m, versions_, s, &mutex_,
|
||||
options_.info_log, file_number, pending_outputs_);
|
||||
s = imm_.InstallMemtableFlushResults(
|
||||
m, versions_.get(), s, &mutex_, options_.info_log.get(),
|
||||
file_number, pending_outputs_);
|
||||
|
||||
if (s.ok()) {
|
||||
if (madeProgress) {
|
||||
@@ -1009,7 +993,7 @@ Status DBImpl::ReadFirstLine(const std::string& fname,
|
||||
}
|
||||
};
|
||||
|
||||
SequentialFile* file;
|
||||
unique_ptr<SequentialFile> file;
|
||||
Status status = env_->NewSequentialFile(fname, &file);
|
||||
|
||||
if (!status.ok()) {
|
||||
@@ -1019,10 +1003,10 @@ Status DBImpl::ReadFirstLine(const std::string& fname,
|
||||
|
||||
LogReporter reporter;
|
||||
reporter.env = env_;
|
||||
reporter.info_log = options_.info_log;
|
||||
reporter.info_log = options_.info_log.get();
|
||||
reporter.fname = fname.c_str();
|
||||
reporter.status = (options_.paranoid_checks ? &status : NULL);
|
||||
log::Reader reader(file, &reporter, true/*checksum*/,
|
||||
log::Reader reader(std::move(file), &reporter, true/*checksum*/,
|
||||
0/*initial_offset*/);
|
||||
std::string scratch;
|
||||
Slice record;
|
||||
@@ -1243,7 +1227,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
|
||||
}
|
||||
}
|
||||
|
||||
Compaction* c = NULL;
|
||||
unique_ptr<Compaction> c;
|
||||
bool is_manual = (manual_compaction_ != NULL) &&
|
||||
(manual_compaction_->in_progress == false);
|
||||
InternalKey manual_end;
|
||||
@@ -1251,10 +1235,11 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
|
||||
ManualCompaction* m = manual_compaction_;
|
||||
assert(!m->in_progress);
|
||||
m->in_progress = true; // another thread cannot pick up the same work
|
||||
c = versions_->CompactRange(m->level, m->begin, m->end);
|
||||
m->done = (c == NULL);
|
||||
if (c != NULL) {
|
||||
c.reset(versions_->CompactRange(m->level, m->begin, m->end));
|
||||
if (c) {
|
||||
manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
|
||||
} else {
|
||||
m->done = true;
|
||||
}
|
||||
Log(options_.info_log,
|
||||
"Manual compaction at level-%d from %s .. %s; will stop at %s\n",
|
||||
@@ -1263,11 +1248,11 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
|
||||
(m->end ? m->end->DebugString().c_str() : "(end)"),
|
||||
(m->done ? "(end)" : manual_end.DebugString().c_str()));
|
||||
} else if (!options_.disable_auto_compactions) {
|
||||
c = versions_->PickCompaction();
|
||||
c.reset(versions_->PickCompaction());
|
||||
}
|
||||
|
||||
Status status;
|
||||
if (c == NULL) {
|
||||
if (!c) {
|
||||
// Nothing to do
|
||||
Log(options_.info_log, "Compaction nothing to do");
|
||||
} else if (!is_manual && c->IsTrivialMove()) {
|
||||
@@ -1285,18 +1270,18 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
|
||||
static_cast<unsigned long long>(f->file_size),
|
||||
status.ToString().c_str(),
|
||||
versions_->LevelSummary(&tmp));
|
||||
versions_->ReleaseCompactionFiles(c, status);
|
||||
versions_->ReleaseCompactionFiles(c.get(), status);
|
||||
*madeProgress = true;
|
||||
} else {
|
||||
CompactionState* compact = new CompactionState(c);
|
||||
CompactionState* compact = new CompactionState(c.get());
|
||||
status = DoCompactionWork(compact);
|
||||
CleanupCompaction(compact);
|
||||
versions_->ReleaseCompactionFiles(c, status);
|
||||
versions_->ReleaseCompactionFiles(c.get(), status);
|
||||
c->ReleaseInputs();
|
||||
FindObsoleteFiles(deletion_state);
|
||||
*madeProgress = true;
|
||||
}
|
||||
delete c;
|
||||
c.reset();
|
||||
|
||||
if (status.ok()) {
|
||||
// Done
|
||||
@@ -1332,11 +1317,10 @@ void DBImpl::CleanupCompaction(CompactionState* compact) {
|
||||
if (compact->builder != NULL) {
|
||||
// May happen if we get a shutdown call in the middle of compaction
|
||||
compact->builder->Abandon();
|
||||
delete compact->builder;
|
||||
compact->builder.reset();
|
||||
} else {
|
||||
assert(compact->outfile == NULL);
|
||||
}
|
||||
delete compact->outfile;
|
||||
for (size_t i = 0; i < compact->outputs.size(); i++) {
|
||||
const CompactionState::Output& out = compact->outputs[i];
|
||||
pending_outputs_.erase(out.number);
|
||||
@@ -1397,8 +1381,8 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
|
||||
std::string fname = TableFileName(dbname_, file_number);
|
||||
Status s = env_->NewWritableFile(fname, &compact->outfile);
|
||||
if (s.ok()) {
|
||||
compact->builder = new TableBuilder(options_, compact->outfile,
|
||||
compact->compaction->level() + 1);
|
||||
compact->builder.reset(new TableBuilder(options_, compact->outfile.get(),
|
||||
compact->compaction->level() + 1));
|
||||
}
|
||||
return s;
|
||||
}
|
||||
@@ -1406,7 +1390,7 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
|
||||
Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
|
||||
Iterator* input) {
|
||||
assert(compact != NULL);
|
||||
assert(compact->outfile != NULL);
|
||||
assert(compact->outfile);
|
||||
assert(compact->builder != NULL);
|
||||
|
||||
const uint64_t output_number = compact->current_output()->number;
|
||||
@@ -1423,8 +1407,7 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
|
||||
const uint64_t current_bytes = compact->builder->FileSize();
|
||||
compact->current_output()->file_size = current_bytes;
|
||||
compact->total_bytes += current_bytes;
|
||||
delete compact->builder;
|
||||
compact->builder = NULL;
|
||||
compact->builder.reset();
|
||||
|
||||
// Finish and check for file errors
|
||||
if (s.ok() && !options_.disableDataSync) {
|
||||
@@ -1437,8 +1420,7 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
|
||||
if (s.ok()) {
|
||||
s = compact->outfile->Close();
|
||||
}
|
||||
delete compact->outfile;
|
||||
compact->outfile = NULL;
|
||||
compact->outfile.reset();
|
||||
|
||||
if (s.ok() && current_entries > 0) {
|
||||
// Verify that the table is usable
|
||||
@@ -1537,7 +1519,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
|
||||
|
||||
assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
|
||||
assert(compact->builder == NULL);
|
||||
assert(compact->outfile == NULL);
|
||||
assert(!compact->outfile);
|
||||
|
||||
SequenceNumber visible_at_tip = 0;
|
||||
SequenceNumber earliest_snapshot;
|
||||
@@ -1560,7 +1542,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
|
||||
mutex_.Unlock();
|
||||
|
||||
const uint64_t start_micros = env_->NowMicros();
|
||||
Iterator* input = versions_->MakeInputIterator(compact->compaction);
|
||||
unique_ptr<Iterator> input(versions_->MakeInputIterator(compact->compaction));
|
||||
input->SeekToFirst();
|
||||
Status status;
|
||||
ParsedInternalKey ikey;
|
||||
@@ -1587,7 +1569,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
|
||||
Slice* compaction_filter_value = NULL;
|
||||
if (compact->compaction->ShouldStopBefore(key) &&
|
||||
compact->builder != NULL) {
|
||||
status = FinishCompactionOutputFile(compact, input);
|
||||
status = FinishCompactionOutputFile(compact, input.get());
|
||||
if (!status.ok()) {
|
||||
break;
|
||||
}
|
||||
@@ -1690,7 +1672,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
|
||||
// Close output file if it is big enough
|
||||
if (compact->builder->FileSize() >=
|
||||
compact->compaction->MaxOutputFileSize()) {
|
||||
status = FinishCompactionOutputFile(compact, input);
|
||||
status = FinishCompactionOutputFile(compact, input.get());
|
||||
if (!status.ok()) {
|
||||
break;
|
||||
}
|
||||
@@ -1704,13 +1686,12 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
|
||||
status = Status::IOError("Deleting DB during compaction");
|
||||
}
|
||||
if (status.ok() && compact->builder != NULL) {
|
||||
status = FinishCompactionOutputFile(compact, input);
|
||||
status = FinishCompactionOutputFile(compact, input.get());
|
||||
}
|
||||
if (status.ok()) {
|
||||
status = input->status();
|
||||
}
|
||||
delete input;
|
||||
input = NULL;
|
||||
input.reset();
|
||||
|
||||
CompactionStats stats;
|
||||
stats.micros = env_->NowMicros() - start_micros - imm_micros;
|
||||
@@ -1950,9 +1931,9 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
|
||||
status = log_->AddRecord(WriteBatchInternal::Contents(updates));
|
||||
if (status.ok() && options.sync) {
|
||||
if (options_.use_fsync) {
|
||||
status = logfile_->Fsync();
|
||||
status = log_->file()->Fsync();
|
||||
} else {
|
||||
status = logfile_->Sync();
|
||||
status = log_->file()->Sync();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2117,18 +2098,15 @@ Status DBImpl::MakeRoomForWrite(bool force) {
|
||||
DelayLoggingAndReset();
|
||||
assert(versions_->PrevLogNumber() == 0);
|
||||
uint64_t new_log_number = versions_->NewFileNumber();
|
||||
WritableFile* lfile = NULL;
|
||||
unique_ptr<WritableFile> lfile;
|
||||
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
|
||||
if (!s.ok()) {
|
||||
// Avoid chewing through file number space in a tight loop.
|
||||
versions_->ReuseFileNumber(new_log_number);
|
||||
versions_->ReuseFileNumber(new_log_number);
|
||||
break;
|
||||
}
|
||||
delete log_;
|
||||
delete logfile_;
|
||||
logfile_ = lfile;
|
||||
logfile_number_ = new_log_number;
|
||||
log_ = new log::Writer(lfile);
|
||||
log_.reset(new log::Writer(std::move(lfile)));
|
||||
imm_.Add(mem_);
|
||||
mem_ = new MemTable(internal_comparator_, NumberLevels());
|
||||
mem_->Ref();
|
||||
@@ -2310,14 +2288,13 @@ Status DB::Open(const Options& options, const std::string& dbname,
|
||||
s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists
|
||||
if (s.ok()) {
|
||||
uint64_t new_log_number = impl->versions_->NewFileNumber();
|
||||
WritableFile* lfile;
|
||||
unique_ptr<WritableFile> lfile;
|
||||
s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
|
||||
&lfile);
|
||||
if (s.ok()) {
|
||||
edit.SetLogNumber(new_log_number);
|
||||
impl->logfile_ = lfile;
|
||||
impl->logfile_number_ = new_log_number;
|
||||
impl->log_ = new log::Writer(lfile);
|
||||
impl->log_.reset(new log::Writer(std::move(lfile)));
|
||||
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
|
||||
}
|
||||
if (s.ok()) {
|
||||
|
||||
Reference in New Issue
Block a user