diff --git a/db/db_test.cc b/db/db_test.cc index c50822d739..2a8cee4e7c 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -4543,43 +4543,16 @@ TEST(DBTest, TransactionLogIterator) { { auto iter = OpenTransactionLogIter(0); ExpectRecords(3, iter); - assert(!iter->IsObsolete()); - iter->Next(); - assert(!iter->Valid()); - assert(!iter->IsObsolete()); - assert(iter->status().ok()); - - Reopen(&options); - env_->SleepForMicroseconds(2 * 1000 * 1000); + } + Reopen(&options); + env_->SleepForMicroseconds(2 * 1000 * 1000);{ Put("key4", DummyString(1024)); Put("key5", DummyString(1024)); Put("key6", DummyString(1024)); - - iter->Next(); - assert(!iter->Valid()); - assert(iter->IsObsolete()); - assert(iter->status().ok()); } { auto iter = OpenTransactionLogIter(0); ExpectRecords(6, iter); - assert(!iter->IsObsolete()); - iter->Next(); - assert(!iter->Valid()); - assert(!iter->IsObsolete()); - assert(iter->status().ok()); - - Put("key7", DummyString(1024)); - iter->Next(); - assert(iter->Valid()); - assert(iter->status().ok()); - - dbfull()->Flush(FlushOptions()); - Put("key8", DummyString(1024)); - iter->Next(); - assert(!iter->Valid()); - assert(iter->IsObsolete()); - assert(iter->status().ok()); } } while (ChangeCompactOptions()); } diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index f039c3426a..36b8932a5c 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -21,7 +21,6 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl( files_(std::move(files)), started_(false), isValid_(false), - is_obsolete_(false), currentFileIndex_(0), currentBatchSeq_(0), currentLastSeq_(0), @@ -70,15 +69,14 @@ bool TransactionLogIteratorImpl::Valid() { return started_ && isValid_; } -bool TransactionLogIteratorImpl::RestrictedRead(Slice* record, - std::string* scratch) { - bool ret = currentLogReader_->ReadRecord(record, scratch); - - if (!reporter_.last_status.ok()) { - currentStatus_ = reporter_.last_status; +bool TransactionLogIteratorImpl::RestrictedRead( + Slice* record, + std::string* scratch) { + // Don't read if no more complete entries to read from logs + if (currentLastSeq_ >= dbimpl_->GetLatestSequenceNumber()) { + return false; } - - return ret; + return currentLogReader_->ReadRecord(record, scratch); } void TransactionLogIteratorImpl::SeekToStartSequence( @@ -88,7 +86,6 @@ void TransactionLogIteratorImpl::SeekToStartSequence( Slice record; started_ = false; isValid_ = false; - is_obsolete_ = false; if (files_->size() <= startFileIndex) { return; } @@ -97,18 +94,6 @@ void TransactionLogIteratorImpl::SeekToStartSequence( currentStatus_ = s; return; } - auto latest_seq_num = dbimpl_->GetLatestSequenceNumber(); - if (startingSequenceNumber_ > latest_seq_num) { - if (strict) { - currentStatus_ = Status::Corruption("Gap in sequence number. Could not " - "seek to required sequence number"); - reporter_.Info(currentStatus_.ToString().c_str()); - } else { - // isValid_ is false; - return; - } - } - while (RestrictedRead(&record, &scratch)) { if (record.size() < 12) { reporter_.Corruption( @@ -138,11 +123,11 @@ void TransactionLogIteratorImpl::SeekToStartSequence( // only file. Otherwise log the error and let the iterator return next entry // If strict is set, we want to seek exactly till the start sequence and it // should have been present in the file we scanned above - if (strict || files_->size() == 1) { + if (strict) { currentStatus_ = Status::Corruption("Gap in sequence number. Could not " "seek to required sequence number"); reporter_.Info(currentStatus_.ToString().c_str()); - } else { + } else if (files_->size() != 1) { currentStatus_ = Status::Corruption("Start sequence was not found, " "skipping to the next available"); reporter_.Info(currentStatus_.ToString().c_str()); @@ -164,30 +149,11 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) { // Runs every time until we can seek to the start sequence return SeekToStartSequence(); } - - is_obsolete_ = false; - auto latest_seq_num = dbimpl_->GetLatestSequenceNumber(); - if (currentLastSeq_ >= latest_seq_num) { - isValid_ = false; - return; - } - - bool first = true; - while (currentFileIndex_ < files_->size()) { - if (!first) { - Status status =OpenLogReader(files_->at(currentFileIndex_).get()); - if (!status.ok()) { - isValid_ = false; - currentStatus_ = status; - return; - } - } - first = false; + while(true) { assert(currentLogReader_); if (currentLogReader_->IsEOF()) { currentLogReader_->UnmarkEOF(); } - while (RestrictedRead(&record, &scratch)) { if (record.size() < 12) { reporter_.Corruption( @@ -205,14 +171,26 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) { return; } } - // Open the next file - ++currentFileIndex_; - } - // Read all the files but cannot find next record expected. - // TODO(sdong): support to auto fetch new log files from DB and continue. - isValid_ = false; - is_obsolete_ = true; + // Open the next file + if (currentFileIndex_ < files_->size() - 1) { + ++currentFileIndex_; + Status status =OpenLogReader(files_->at(currentFileIndex_).get()); + if (!status.ok()) { + isValid_ = false; + currentStatus_ = status; + return; + } + } else { + isValid_ = false; + if (currentLastSeq_ == dbimpl_->GetLatestSequenceNumber()) { + currentStatus_ = Status::OK(); + } else { + currentStatus_ = Status::Corruption("NO MORE DATA LEFT"); + } + return; + } + } } bool TransactionLogIteratorImpl::IsBatchExpected( diff --git a/db/transaction_log_impl.h b/db/transaction_log_impl.h index 4c85379b65..6454d89e76 100644 --- a/db/transaction_log_impl.h +++ b/db/transaction_log_impl.h @@ -19,9 +19,7 @@ namespace rocksdb { struct LogReporter : public log::Reader::Reporter { Env* env; Logger* info_log; - Status last_status; virtual void Corruption(size_t bytes, const Status& s) { - last_status = s; Log(info_log, "dropping %zu bytes; %s", bytes, s.ToString().c_str()); } virtual void Info(const char* s) { @@ -76,8 +74,6 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { virtual bool Valid(); - virtual bool IsObsolete() override { return is_obsolete_; } - virtual void Next(); virtual Status status(); @@ -93,7 +89,6 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { std::unique_ptr files_; bool started_; bool isValid_; // not valid when it starts of. - bool is_obsolete_; Status currentStatus_; size_t currentFileIndex_; std::unique_ptr currentBatch_; diff --git a/include/rocksdb/transaction_log.h b/include/rocksdb/transaction_log.h index 1bb259b1fd..30443bba55 100644 --- a/include/rocksdb/transaction_log.h +++ b/include/rocksdb/transaction_log.h @@ -73,12 +73,6 @@ class TransactionLogIterator { // Can read data from a valid iterator. virtual bool Valid() = 0; - // IsObsolete() returns true if new log files were created. This usually - // means that the user needs to close the current iterator and create a new - // one to get the newest updates. It should happen only when mem tables are - // flushed. - virtual bool IsObsolete() = 0; - // Moves the iterator to the next WriteBatch. // REQUIRES: Valid() to be true. virtual void Next() = 0; diff --git a/tools/db_repl_stress.cc b/tools/db_repl_stress.cc index e336908ce4..27cb6d5abf 100644 --- a/tools/db_repl_stress.cc +++ b/tools/db_repl_stress.cc @@ -67,21 +67,8 @@ static void ReplicationThreadBody(void* arg) { } } fprintf(stderr, "Refreshing iterator\n"); - for (; !iter->IsObsolete(); iter->Next()) { - if (!iter->Valid()) { - if (t->stop.Acquire_Load() == nullptr) { - return; - } - // need to wait for new rows. - continue; - } - + for(;iter->Valid(); iter->Next(), t->no_read++, currentSeqNum++) { BatchResult res = iter->GetBatch(); - if (!iter->status().ok()) { - fprintf(stderr, "Corruption reported when reading seq no. b/w %ld", - static_cast(currentSeqNum)); - exit(1); - } if (res.sequence != currentSeqNum) { fprintf(stderr, "Missed a seq no. b/w %ld and %ld\n", @@ -89,8 +76,6 @@ static void ReplicationThreadBody(void* arg) { (long)res.sequence); exit(1); } - t->no_read++; - currentSeqNum++; } } }