diff --git a/db/db_impl.cc b/db/db_impl.cc index 570d04bd77..4c2728c2ff 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -907,7 +907,7 @@ Status DBImpl::GetUpdatesSince(SequenceNumber seq, probableWALFiles, &last_flushed_sequence_)); iter->get()->Next(); - return Status::OK(); + return iter->get()->status(); } Status DBImpl::FindProbableWALFiles(std::vector* const allLogs, diff --git a/db/db_test.cc b/db/db_test.cc index 15c0c6083b..fa61ebcdcb 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2700,6 +2700,22 @@ TEST(DBTest, TransactionLogIteratorCheckAfterRestart) { ExpectRecords(2, iter); } +TEST(DBTest, TransactionLogIteratorBatchOperations) { + Options options = OptionsForLogIterTest(); + DestroyAndReopen(&options); + WriteBatch batch; + batch.Put("key1", DummyString(1024)); + batch.Put("key2", DummyString(1024)); + batch.Put("key3", DummyString(1024)); + batch.Delete("key2"); + dbfull()->Write(WriteOptions(), &batch); + dbfull()->Flush(FlushOptions()); + Reopen(&options); + Put("key4", DummyString(1024)); + auto iter = OpenTransactionLogIter(3); + ExpectRecords(1, iter); +} + TEST(DBTest, ReadCompaction) { std::string value(4096, '4'); // a string of size 4K { diff --git a/db/transaction_log_iterator_impl.cc b/db/transaction_log_iterator_impl.cc index ed553cbe0f..a6edac238f 100644 --- a/db/transaction_log_iterator_impl.cc +++ b/db/transaction_log_iterator_impl.cc @@ -14,10 +14,10 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl( dbname_(dbname), options_(options), soptions_(soptions), - sequenceNumber_(seq), + startingSequenceNumber_(seq), files_(files), started_(false), - isValid_(true), + isValid_(false), currentFileIndex_(0), lastFlushedSequence_(lastFlushedSequence) { assert(files_ != nullptr); @@ -73,54 +73,49 @@ bool TransactionLogIteratorImpl::Valid() { } void TransactionLogIteratorImpl::Next() { -// First seek to the given seqNo. in the current file. LogFile currentLogFile = files_->at(currentFileIndex_); LogReporter reporter = NewLogReporter(currentLogFile.logNumber); + +// First seek to the given seqNo. in the current file. std::string scratch; Slice record; - if (!started_) { + started_ = true; // this piece only runs onced. isValid_ = false; - if (sequenceNumber_ > *lastFlushedSequence_) { + if (startingSequenceNumber_ > *lastFlushedSequence_) { currentStatus_ = Status::IOError("Looking for a sequence, " "which is not flushed yet."); return; } - unique_ptr file; - Status status = OpenLogFile(currentLogFile, &file); - if (!status.ok()) { - currentStatus_ = status; + Status s = OpenLogReader(currentLogFile); + if (!s.ok()) { + currentStatus_ = s; + isValid_ = false; return; } - assert(file); - unique_ptr reader( - new log::Reader(std::move(file), &reporter, true, 0)); - assert(reader); - while (reader->ReadRecord(&record, &scratch)) { + while (currentLogReader_->ReadRecord(&record, &scratch)) { if (record.size() < 12) { reporter.Corruption( record.size(), Status::Corruption("log record too small")); continue; } UpdateCurrentWriteBatch(record); - if (currentSequence_ >= sequenceNumber_) { + if (currentSequence_ >= startingSequenceNumber_) { assert(currentSequence_ <= *lastFlushedSequence_); isValid_ = true; - currentLogReader_ = std::move(reader); break; + } else { + isValid_ = false; } } - if (!isValid_) { - // TODO read the entire first file. and did not find the seq number. - // Error out. - currentStatus_ = - Status::NotFound("Did not find the Seq no. in first file"); + if (isValid_) { + // Done for this iteration + return; } - started_ = true; - } else { -LOOK_NEXT_FILE: + } + bool openNextFile = true; + while(openNextFile) { assert(currentLogReader_); - bool openNextFile = true; if (currentSequence_ < *lastFlushedSequence_) { if (currentLogReader_->IsEOF()) { currentLogReader_->UnmarkEOF(); @@ -141,28 +136,22 @@ LOOK_NEXT_FILE: if (openNextFile) { if (currentFileIndex_ < files_->size() - 1) { ++currentFileIndex_; - currentLogReader_.reset(); - unique_ptr file; - Status status = OpenLogFile(files_->at(currentFileIndex_), &file); + Status status = OpenLogReader(files_->at(currentFileIndex_)); if (!status.ok()) { isValid_ = false; currentStatus_ = status; return; } - currentLogReader_.reset( - new log::Reader(std::move(file), &reporter, true, 0)); - goto LOOK_NEXT_FILE; - } else if (currentSequence_ == *lastFlushedSequence_) { - // The last update has been read. and next is being called. - isValid_ = false; - currentStatus_ = Status::OK(); } else { - // LOOKED AT FILES. WE ARE DONE HERE. isValid_ = false; - currentStatus_ = Status::IOError(" NO MORE DATA LEFT"); + openNextFile = false; + if (currentSequence_ == *lastFlushedSequence_) { + currentStatus_ = Status::OK(); + } else { + currentStatus_ = Status::IOError(" NO MORE DATA LEFT"); + } } } - } } @@ -175,4 +164,17 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) { currentStatus_ = Status::OK(); } +Status TransactionLogIteratorImpl::OpenLogReader(const LogFile& logFile) { + LogReporter reporter = NewLogReporter(logFile.logNumber); + unique_ptr file; + Status status = OpenLogFile(logFile, &file); + if (!status.ok()) { + return status; + } + assert(file); + currentLogReader_.reset( + new log::Reader(std::move(file), &reporter, true, 0) + ); + return Status::OK(); +} } // namespace leveldb diff --git a/db/transaction_log_iterator_impl.h b/db/transaction_log_iterator_impl.h index 8891abf5c7..fccbb15050 100644 --- a/db/transaction_log_iterator_impl.h +++ b/db/transaction_log_iterator_impl.h @@ -50,7 +50,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { const std::string& dbname_; const Options* options_; const StorageOptions& soptions_; - const uint64_t sequenceNumber_; + const uint64_t startingSequenceNumber_; const std::vector* files_; bool started_; bool isValid_; // not valid when it starts of. @@ -65,6 +65,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { SequenceNumber currentSequence_; void UpdateCurrentWriteBatch(const Slice& record); + Status OpenLogReader(const LogFile& file); };