diff --git a/db/log_reader.cc b/db/log_reader.cc index 6596cd84f1..1dc567413d 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -28,6 +28,8 @@ Reader::Reader(unique_ptr&& file, Reporter* reporter, backing_store_(new char[kBlockSize]), buffer_(), eof_(false), + read_error_(false), + eof_offset_(0), last_record_offset_(0), end_of_buffer_offset_(0), initial_offset_(initial_offset) { @@ -170,6 +172,69 @@ uint64_t Reader::LastRecordOffset() { return last_record_offset_; } +void Reader::UnmarkEOF() { + if (read_error_) { + return; + } + + eof_ = false; + + if (eof_offset_ == 0) { + return; + } + + // If the EOF was in the middle of a block (a partial block was read) we have + // to read the rest of the block as ReadPhysicalRecord can only read full + // blocks and expects the file position indicator to be aligned to the start + // of a block. + // + // consumed_bytes + buffer_size() + remaining == kBlockSize + + size_t consumed_bytes = eof_offset_ - buffer_.size(); + size_t remaining = kBlockSize - eof_offset_; + + // backing_store_ is used to concatenate what is left in buffer_ and + // the remainder of the block. If buffer_ already uses backing_store_, + // we just append the new data. + if (buffer_.data() != backing_store_ + consumed_bytes) { + // Buffer_ does not use backing_store_ for storage. + // Copy what is left in buffer_ to backing_store. + memmove(backing_store_ + consumed_bytes, buffer_.data(), buffer_.size()); + } + + Slice read_buffer; + Status status = file_->Read(remaining, &read_buffer, + backing_store_ + eof_offset_); + + size_t added = read_buffer.size(); + end_of_buffer_offset_ += added; + + if (!status.ok()) { + if (added > 0) { + ReportDrop(added, status); + } + + read_error_ = true; + return; + } + + if (read_buffer.data() != backing_store_ + eof_offset_) { + // Read did not write to backing_store_ + memmove(backing_store_ + eof_offset_, read_buffer.data(), + read_buffer.size()); + } + + buffer_ = Slice(backing_store_ + consumed_bytes, + eof_offset_ + added - consumed_bytes); + + if (added < remaining) { + eof_ = true; + eof_offset_ += added; + } else { + eof_offset_ = 0; + } +} + void Reader::ReportCorruption(size_t bytes, const char* reason) { ReportDrop(bytes, Status::Corruption(reason)); } @@ -184,7 +249,7 @@ void Reader::ReportDrop(size_t bytes, const Status& reason) { unsigned int Reader::ReadPhysicalRecord(Slice* result) { while (true) { if (buffer_.size() < (size_t)kHeaderSize) { - if (!eof_) { + if (!eof_ && !read_error_) { // Last read was a full read, so this is a trailer to skip buffer_.clear(); Status status = file_->Read(kBlockSize, &buffer_, backing_store_); @@ -192,10 +257,11 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) { if (!status.ok()) { buffer_.clear(); ReportDrop(kBlockSize, status); - eof_ = true; + read_error_ = true; return kEof; } else if (buffer_.size() < (size_t)kBlockSize) { eof_ = true; + eof_offset_ = buffer_.size(); } continue; } else if (buffer_.size() == 0) { diff --git a/db/log_reader.h b/db/log_reader.h index 8e277c8216..81d334da29 100644 --- a/db/log_reader.h +++ b/db/log_reader.h @@ -69,9 +69,10 @@ class Reader { // when we know more data has been written to the file. we can use this // function to force the reader to look again in the file. - void UnmarkEOF() { - eof_ = false; - } + // Also aligns the file position indicator to the start of the next block + // by reading the rest of the data from the EOF position to the end of the + // block that was partially read. + void UnmarkEOF(); SequentialFile* file() { return file_.get(); } @@ -82,6 +83,11 @@ class Reader { char* const backing_store_; Slice buffer_; bool eof_; // Last Read() indicated EOF by returning < kBlockSize + bool read_error_; // Error occurred while reading from file + + // Offset of the file position indicator within the last block when an + // EOF was detected. + size_t eof_offset_; // Offset of the last record returned by ReadRecord. uint64_t last_record_offset_; diff --git a/db/log_test.cc b/db/log_test.cc index dedbff0aae..6365188358 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -47,36 +47,93 @@ class LogTest { public: std::string contents_; + explicit StringDest(Slice& reader_contents) : + WritableFile(), + contents_(""), + reader_contents_(reader_contents), + last_flush_(0) { + reader_contents_ = Slice(contents_.data(), 0); + }; + virtual Status Close() { return Status::OK(); } - virtual Status Flush() { return Status::OK(); } + virtual Status Flush() { + ASSERT_TRUE(reader_contents_.size() <= last_flush_); + size_t offset = last_flush_ - reader_contents_.size(); + reader_contents_ = Slice( + contents_.data() + offset, + contents_.size() - offset); + last_flush_ = contents_.size(); + + return Status::OK(); + } virtual Status Sync() { return Status::OK(); } virtual Status Append(const Slice& slice) { contents_.append(slice.data(), slice.size()); return Status::OK(); } + void Drop(size_t bytes) { + contents_.resize(contents_.size() - bytes); + reader_contents_ = Slice( + reader_contents_.data(), reader_contents_.size() - bytes); + last_flush_ = contents_.size(); + } + + private: + Slice& reader_contents_; + size_t last_flush_; }; class StringSource : public SequentialFile { public: - Slice contents_; + Slice& contents_; bool force_error_; + size_t force_error_position_; + bool force_eof_; + size_t force_eof_position_; bool returned_partial_; - StringSource() : force_error_(false), returned_partial_(false) { } + explicit StringSource(Slice& contents) : + contents_(contents), + force_error_(false), + force_error_position_(0), + force_eof_(false), + force_eof_position_(0), + returned_partial_(false) { } virtual Status Read(size_t n, Slice* result, char* scratch) { ASSERT_TRUE(!returned_partial_) << "must not Read() after eof/error"; if (force_error_) { - force_error_ = false; - returned_partial_ = true; - return Status::Corruption("read error"); + if (force_error_position_ >= n) { + force_error_position_ -= n; + } else { + *result = Slice(contents_.data(), force_error_position_); + contents_.remove_prefix(force_error_position_); + force_error_ = false; + returned_partial_ = true; + return Status::Corruption("read error"); + } } if (contents_.size() < n) { n = contents_.size(); returned_partial_ = true; } - *result = Slice(contents_.data(), n); + + if (force_eof_) { + if (force_eof_position_ >= n) { + force_eof_position_ -= n; + } else { + force_eof_ = false; + n = force_eof_position_; + returned_partial_ = true; + } + } + + // By using scratch we ensure that caller has control over the + // lifetime of result.data() + memcpy(scratch, contents_.data(), n); + *result = Slice(scratch, n); + contents_.remove_prefix(n); return Status::OK(); } @@ -123,10 +180,10 @@ class LogTest { src->contents_ = dest_contents(); } + Slice reader_contents_; unique_ptr dest_holder_; unique_ptr source_holder_; ReportCollector report_; - bool reading_; Writer writer_; Reader reader_; @@ -135,16 +192,15 @@ class LogTest { static uint64_t initial_offset_last_record_offsets_[]; public: - LogTest() : dest_holder_(new StringDest), - source_holder_(new StringSource), - reading_(false), + LogTest() : reader_contents_(), + dest_holder_(new StringDest(reader_contents_)), + source_holder_(new StringSource(reader_contents_)), writer_(std::move(dest_holder_)), reader_(std::move(source_holder_), &report_, true/*checksum*/, 0/*initial_offset*/) { } void Write(const std::string& msg) { - ASSERT_TRUE(!reading_) << "Write() after starting to read"; writer_.AddRecord(Slice(msg)); } @@ -153,10 +209,6 @@ class LogTest { } std::string Read() { - if (!reading_) { - reading_ = true; - reset_source_contents(); - } std::string scratch; Slice record; if (reader_.ReadRecord(&record, &scratch)) { @@ -175,7 +227,9 @@ class LogTest { } void ShrinkSize(int bytes) { - dest_contents().resize(dest_contents().size() - bytes); + auto dest = dynamic_cast(writer_.file()); + assert(dest); + dest->Drop(bytes); } void FixChecksum(int header_offset, int len) { @@ -185,9 +239,10 @@ class LogTest { EncodeFixed32(&dest_contents()[header_offset], crc); } - void ForceError() { + void ForceError(size_t position = 0) { auto src = dynamic_cast(reader_.file()); src->force_error_ = true; + src->force_error_position_ = position; } size_t DroppedBytes() const { @@ -198,6 +253,22 @@ class LogTest { return report_.message_; } + void ForceEOF(size_t position = 0) { + auto src = dynamic_cast(reader_.file()); + src->force_eof_ = true; + src->force_eof_position_ = position; + } + + void UnmarkEOF() { + auto src = dynamic_cast(reader_.file()); + src->returned_partial_ = false; + reader_.UnmarkEOF(); + } + + bool IsEOF() { + return reader_.IsEOF(); + } + // Returns OK iff recorded error message contains "msg" std::string MatchError(const std::string& msg) const { if (report_.message_.find(msg) == std::string::npos) { @@ -217,9 +288,7 @@ class LogTest { void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) { WriteInitialOffsetLog(); - reading_ = true; - unique_ptr source(new StringSource); - source->contents_ = dest_contents(); + unique_ptr source(new StringSource(reader_contents_)); unique_ptr offset_reader( new Reader(std::move(source), &report_, true/*checksum*/, WrittenBytes() + offset_past_end)); @@ -231,9 +300,7 @@ class LogTest { void CheckInitialOffsetRecord(uint64_t initial_offset, int expected_record_offset) { WriteInitialOffsetLog(); - reading_ = true; - unique_ptr source(new StringSource); - source->contents_ = dest_contents(); + unique_ptr source(new StringSource(reader_contents_)); unique_ptr offset_reader( new Reader(std::move(source), &report_, true/*checksum*/, initial_offset)); @@ -520,6 +587,70 @@ TEST(LogTest, ReadPastEnd) { CheckOffsetPastEndReturnsNoRecords(5); } +TEST(LogTest, ClearEofSingleBlock) { + Write("foo"); + Write("bar"); + ForceEOF(3 + kHeaderSize + 2); + ASSERT_EQ("foo", Read()); + UnmarkEOF(); + ASSERT_EQ("bar", Read()); + ASSERT_TRUE(IsEOF()); + ASSERT_EQ("EOF", Read()); + Write("xxx"); + UnmarkEOF(); + ASSERT_EQ("xxx", Read()); + ASSERT_TRUE(IsEOF()); +} + +TEST(LogTest, ClearEofMultiBlock) { + size_t num_full_blocks = 5; + size_t n = (kBlockSize - kHeaderSize) * num_full_blocks + 25; + Write(BigString("foo", n)); + Write(BigString("bar", n)); + ForceEOF(n + num_full_blocks * kHeaderSize + 10); + ASSERT_EQ(BigString("foo", n), Read()); + ASSERT_TRUE(IsEOF()); + UnmarkEOF(); + ASSERT_EQ(BigString("bar", n), Read()); + ASSERT_TRUE(IsEOF()); + Write(BigString("xxx", n)); + UnmarkEOF(); + ASSERT_EQ(BigString("xxx", n), Read()); + ASSERT_TRUE(IsEOF()); +} + +TEST(LogTest, ClearEofError) { + // If an error occurs during Read() in UnmarkEOF(), the records contained + // in the buffer should be returned on subsequent calls of ReadRecord() + // until no more full records are left, whereafter ReadRecord() should return + // false to indicate that it cannot read any further. + + Write("foo"); + Write("bar"); + UnmarkEOF(); + ASSERT_EQ("foo", Read()); + ASSERT_TRUE(IsEOF()); + Write("xxx"); + ForceError(0); + UnmarkEOF(); + ASSERT_EQ("bar", Read()); + ASSERT_EQ("EOF", Read()); +} + +TEST(LogTest, ClearEofError2) { + Write("foo"); + Write("bar"); + UnmarkEOF(); + ASSERT_EQ("foo", Read()); + Write("xxx"); + ForceError(3); + UnmarkEOF(); + ASSERT_EQ("bar", Read()); + ASSERT_EQ("EOF", Read()); + ASSERT_EQ(3U, DroppedBytes()); + ASSERT_EQ("OK", MatchError("read error")); +} + } // namespace log } // namespace rocksdb