diff --git a/db/db_impl.cc b/db/db_impl.cc index e1a2154161..cf73158fd1 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -23,6 +23,7 @@ #include "db/table_cache.h" #include "db/version_set.h" #include "db/write_batch_internal.h" +#include "db/transaction_log_iterator_impl.h" #include "leveldb/db.h" #include "leveldb/env.h" #include "leveldb/statistics.h" @@ -484,7 +485,7 @@ void DBImpl::DeleteObsoleteFiles() { void DBImpl::PurgeObsoleteWALFiles() { if (options_.WAL_ttl_seconds != ULONG_MAX && options_.WAL_ttl_seconds > 0) { std::vector WALFiles; - std::string archivalDir = dbname_ + "/" + ARCHIVAL_DIR; + std::string archivalDir = GetArchivalDirectoryName(); env_->GetChildren(archivalDir, &WALFiles); int64_t currentTime; const Status status = env_->GetCurrentTime(¤tTime); @@ -884,6 +885,165 @@ Status DBImpl::Flush(const FlushOptions& options) { return status; } +Status DBImpl::GetUpdatesSince(SequenceNumber seq, + TransactionLogIterator** iter) { + + // Get All Log Files. + // Sort Files + // Get the first entry from each file. + // Do binary search and open files and find the seq number. + + std::vector walFiles; + // list wal files in main db dir. + Status s = ListAllWALFiles(dbname_, &walFiles, kAliveLogFile); + if (!s.ok()) { + return s; + } + // list wal files in archive dir. + s = ListAllWALFiles(GetArchivalDirectoryName(), &walFiles, kArchivedLogFile); + if (!s.ok()) { + return s; + } + + if (walFiles.empty()) { + return Status::IOError(" NO WAL Files present in the db"); + } + // std::shared_ptr would have been useful here. + + std::vector* probableWALFiles = new std::vector(); + FindProbableWALFiles(&walFiles, probableWALFiles, seq); + if (probableWALFiles->empty()) { + return Status::IOError(" No wal files for the given seqNo. found"); + } + + TransactionLogIteratorImpl* impl = + new TransactionLogIteratorImpl(dbname_, &options_, seq, probableWALFiles); + *iter = impl; + return Status::OK(); +} + +Status DBImpl::FindProbableWALFiles(std::vector* const allLogs, + std::vector* const result, + const SequenceNumber target) { + assert(allLogs != NULL); + assert(result != NULL); + + std::sort(allLogs->begin(), allLogs->end()); + size_t start = 0; + size_t end = allLogs->size(); + // Binary Search. avoid opening all files. + while (start < end) { + int mid = (start + end) / 2; + WriteBatch batch; + Status s = ReadFirstRecord(allLogs->at(mid), &batch); + if (!s.ok()) { + return s; + } + SequenceNumber currentSeqNum = WriteBatchInternal::Sequence(&batch); + if (currentSeqNum == target) { + start = mid; + end = mid; + } else if (currentSeqNum < target) { + start = mid; + } else { + end = mid; + } + } + assert( start == end); + for( size_t i = start; i < allLogs->size(); ++i) { + result->push_back(allLogs->at(i)); + } + return Status::OK(); +} + +Status DBImpl::ReadFirstRecord(const LogFile& file, WriteBatch* const result) { + + if (file.type == kAliveLogFile) { + std::string fname = LogFileName(dbname_, file.logNumber); + Status status = ReadFirstLine(fname, result); + if (!status.ok()) { + // check if the file got moved to archive. + std::string archivedFile = ArchivedLogFileName(dbname_, file.logNumber); + Status s = ReadFirstLine(archivedFile, result); + if (!s.ok()) { + return Status::IOError("Log File Has been deleted"); + } + } + return Status::OK(); + } else if (file.type == kArchivedLogFile) { + std::string fname = ArchivedLogFileName(dbname_, file.logNumber); + Status status = ReadFirstLine(fname, result); + return status; + } + return Status::NotSupported("File Type Not Known"); +} + +Status DBImpl::ReadFirstLine(const std::string& fname, + WriteBatch* const batch) { + struct LogReporter : public log::Reader::Reporter { + Env* env; + Logger* info_log; + const char* fname; + Status* status; // NULL if options_.paranoid_checks==false + virtual void Corruption(size_t bytes, const Status& s) { + Log(info_log, "%s%s: dropping %d bytes; %s", + (this->status == NULL ? "(ignoring error) " : ""), + fname, static_cast(bytes), s.ToString().c_str()); + if (this->status != NULL && this->status->ok()) *this->status = s; + } + }; + + SequentialFile* file; + Status status = env_->NewSequentialFile(fname, &file); + + if (!status.ok()) { + return status; + } + + + LogReporter reporter; + reporter.env = env_; + reporter.info_log = options_.info_log; + reporter.fname = fname.c_str(); + reporter.status = (options_.paranoid_checks ? &status : NULL); + log::Reader reader(file, &reporter, true/*checksum*/, + 0/*initial_offset*/); + std::string scratch; + Slice record; + if (reader.ReadRecord(&record, &scratch) && status.ok()) { + if (record.size() < 12) { + reporter.Corruption( + record.size(), Status::Corruption("log record too small")); + return Status::IOError("Corruption noted"); + // TODO read record's till the first no corrupt entry? + } + WriteBatchInternal::SetContents(batch, record); + return Status::OK(); + } + return Status::IOError("Error reading from file " + fname); +} + +Status DBImpl::ListAllWALFiles(const std::string& path, + std::vector* const logFiles, + WalFileType logType) { + assert(logFiles != NULL); + std::vector allFiles; + const Status status = env_->GetChildren(path, &allFiles); + if (!status.ok()) { + return status; + } + for(std::vector::iterator it = allFiles.begin(); + it != allFiles.end(); + ++it) { + uint64_t number; + FileType type; + if (ParseFileName(*it, &number, &type) && type == kLogFile){ + logFiles->push_back(LogFile(number, logType)); + } + } + return status; +} + void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { assert(level >= 0); diff --git a/db/db_impl.h b/db/db_impl.h index 833932defb..7b45e16712 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -8,6 +8,7 @@ #include #include #include "db/dbformat.h" +#include "db/log_file.h" #include "db/log_writer.h" #include "db/snapshot.h" #include "leveldb/db.h" @@ -54,7 +55,8 @@ class DBImpl : public DB { virtual Status EnableFileDeletions(); virtual Status GetLiveFiles(std::vector&, uint64_t* manifest_file_size); - + virtual Status GetUpdatesSince(SequenceNumber seq_number, + TransactionLogIterator ** iter); // Return's the path of the archival directory. std::string GetArchivalDirectoryName(); @@ -178,6 +180,22 @@ protected: void EvictObsoleteFiles(DeletionState& deletion_state); void PurgeObsoleteWALFiles(); + + Status ListAllWALFiles(const std::string& path, + std::vector* logFiles, + WalFileType type); + + // Find's all the log files which contain updates with seq no. + // Greater Than or Equal to the requested SequenceNumber + Status FindProbableWALFiles(std::vector* const allLogs, + std::vector* const result, + const SequenceNumber target); + + + Status ReadFirstRecord(const LogFile& file, WriteBatch* const result); + + + Status ReadFirstLine(const std::string& fname, WriteBatch* const batch); // Constant after construction const InternalFilterPolicy internal_filter_policy_; bool owns_info_log_; diff --git a/db/db_test.cc b/db/db_test.cc index 5cf46ed692..acafb1c95d 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2264,6 +2264,63 @@ TEST(DBTest, WALArchival) { } +TEST(DBTest, TransactionLogIterator) { + std::string value(1024, '1'); + Options options = CurrentOptions(); + options.create_if_missing = true; + options.WAL_ttl_seconds = 1000; + DestroyAndReopen(&options); + Put("key1", value); + Put("key2", value); + Put("key2", value); + { + TransactionLogIterator* iter; + Status status = dbfull()->GetUpdatesSince(0, &iter); + ASSERT_TRUE(status.ok()); + ASSERT_TRUE(!iter->Valid()); + iter->Next(); + int i = 0; + SequenceNumber lastSequence = 0; + while (iter->Valid()) { + WriteBatch batch; + iter->GetBatch(&batch); + SequenceNumber current = WriteBatchInternal::Sequence(&batch); + // ASSERT_TRUE(current > lastSequence); + ++i; + lastSequence = current; + ASSERT_TRUE(iter->status().ok()); + iter->Next(); + } + ASSERT_EQ(i, 3); + } + Reopen(&options); + { + Put("key4", value); + Put("key5", value); + Put("key6", value); + } + { + TransactionLogIterator* iter; + Status status = dbfull()->GetUpdatesSince(0, &iter); + ASSERT_TRUE(status.ok()); + ASSERT_TRUE(!iter->Valid()); + iter->Next(); + int i = 0; + SequenceNumber lastSequence = 0; + while (iter->Valid()) { + WriteBatch batch; + iter->GetBatch(&batch); + SequenceNumber current = WriteBatchInternal::Sequence(&batch); + ASSERT_TRUE(current > lastSequence); + lastSequence = current; + ASSERT_TRUE(iter->status().ok()); + iter->Next(); + ++i; + } + ASSERT_EQ(i, 6); + } +} + TEST(DBTest, ReadCompaction) { std::string value(4096, '4'); // a string of size 4K { @@ -2526,6 +2583,11 @@ class ModelDB: public DB { return Status::OK(); } + virtual Status GetUpdatesSince(leveldb::SequenceNumber, + leveldb::TransactionLogIterator**) { + return Status::NotSupported("Not supported in Model DB"); + } + private: class ModelIter: public Iterator { public: diff --git a/db/dbformat.h b/db/dbformat.h index 1ba74e60ba..89f42fbf5f 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -11,6 +11,7 @@ #include "leveldb/filter_policy.h" #include "leveldb/slice.h" #include "leveldb/table_builder.h" +#include "leveldb/types.h" #include "util/coding.h" #include "util/logging.h" @@ -33,8 +34,6 @@ enum ValueType { // ValueType, not the lowest). static const ValueType kValueTypeForSeek = kTypeValue; -typedef uint64_t SequenceNumber; - // We leave eight bits empty at the bottom so a type and sequence# // can be packed together into 64-bits. static const SequenceNumber kMaxSequenceNumber = diff --git a/db/filename.cc b/db/filename.cc index ad9bb7b4b1..ce4a49eeb0 100644 --- a/db/filename.cc +++ b/db/filename.cc @@ -2,13 +2,13 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#include "db/filename.h" + #include #include -#include "db/filename.h" #include "db/dbformat.h" #include "leveldb/env.h" #include "util/logging.h" -#include namespace leveldb { @@ -57,6 +57,11 @@ std::string LogFileName(const std::string& name, uint64_t number) { return MakeFileName(name, number, "log"); } +std::string ArchivedLogFileName(const std::string& name, uint64_t number) { + assert(number > 0); + return MakeFileName(name + "/archive", number, "log"); +} + std::string TableFileName(const std::string& name, uint64_t number) { assert(number > 0); return MakeFileName(name, number, "sst"); diff --git a/db/filename.h b/db/filename.h index ddf41cb4d7..e71d0336a7 100644 --- a/db/filename.h +++ b/db/filename.h @@ -32,6 +32,11 @@ enum FileType { // "dbname". extern std::string LogFileName(const std::string& dbname, uint64_t number); +// Return the name of the archived log file with the specified number +// in the db named by "dbname". The result will be prefixed with "dbname". +extern std::string ArchivedLogFileName(const std::string& dbname, + uint64_t num); + // Return the name of the sstable with the specified number // in the db named by "dbname". The result will be prefixed with // "dbname". diff --git a/db/log_file.h b/db/log_file.h new file mode 100644 index 0000000000..da7808e65d --- /dev/null +++ b/db/log_file.h @@ -0,0 +1,48 @@ +// Copyright 2008-present Facebook. All Rights Reserved. + +#ifndef STORAGE_LEVELDB_DB_LOG_FILE_H_ +#define STORAGE_LEVELDB_DB_LOG_FILE_H_ + +namespace leveldb { + +enum WalFileType { + kArchivedLogFile = 0, + kAliveLogFile = 1 +} ; + +class LogFile { + + public: + uint64_t logNumber; + WalFileType type; + + LogFile(uint64_t logNum,WalFileType logType) : + logNumber(logNum), + type(logType) {} + + LogFile(const LogFile& that) { + logNumber = that.logNumber; + type = that.type; + } + + bool operator < (const LogFile& that) const { + return logNumber < that.logNumber; + } + + std::string ToString() const { + char response[100]; + const char* typeOfLog; + if (type == kAliveLogFile) { + typeOfLog = "Alive Log"; + } else { + typeOfLog = "Archived Log"; + } + sprintf(response, + "LogNumber : %ld LogType : %s", + logNumber, + typeOfLog); + return std::string(response); + } +}; +} // namespace leveldb +#endif // STORAGE_LEVELDB_DB_LOG_FILE_H_ diff --git a/db/transaction_log_iterator_impl.cc b/db/transaction_log_iterator_impl.cc new file mode 100644 index 0000000000..e2781a5746 --- /dev/null +++ b/db/transaction_log_iterator_impl.cc @@ -0,0 +1,146 @@ +#include "db/transaction_log_iterator_impl.h" +#include "db/write_batch_internal.h" +#include "db/filename.h" +namespace leveldb { + +TransactionLogIteratorImpl::TransactionLogIteratorImpl( + const std::string& dbname, + const Options* options, + SequenceNumber& seq, + std::vector* files) : + dbname_(dbname), + options_(options), + sequenceNumber_(seq), + files_(files), + started_(false), + isValid_(true), + currentFileIndex_(0), + currentLogReader_(NULL) { + assert( files_ != NULL); + } + +LogReporter +TransactionLogIteratorImpl::NewLogReporter(const uint64_t logNumber) { + LogReporter reporter; + reporter.env = options_->env; + reporter.info_log = options_->info_log; + reporter.log_number = logNumber; + return reporter; +} + +Status TransactionLogIteratorImpl::OpenLogFile(const LogFile& logFile, + SequentialFile** file) { + Env* env = options_->env; + if (logFile.type == kArchivedLogFile) { + std::string fname = ArchivedLogFileName(dbname_, logFile.logNumber); + return env->NewSequentialFile(fname, file); + } else { + std::string fname = LogFileName(dbname_, logFile.logNumber); + Status status = env->NewSequentialFile(fname, file); + if (!status.ok()) { + // If cannot open file in DB directory. + // Try the archive dir, as it could have moved in the meanwhile. + fname = ArchivedLogFileName(dbname_, logFile.logNumber); + status = env->NewSequentialFile(fname, file); + if (!status.ok()) { + // TODO stringprintf + return Status::IOError(" Requested file not present in the dir"); + } + } + return status; + } +} + +void TransactionLogIteratorImpl::GetBatch(WriteBatch* batch) { + assert(isValid_); // cannot call in a non valid state. + WriteBatchInternal::SetContents(batch, currentRecord_); +} + +Status TransactionLogIteratorImpl::status() { + return currentStatus_; +} + +bool TransactionLogIteratorImpl::Valid() { + return started_ && isValid_; +} + +void TransactionLogIteratorImpl::Next() { +// First seek to the given seqNo. in the current file. + LogFile currentLogFile = files_->at(currentFileIndex_); + LogReporter reporter = NewLogReporter(currentLogFile.logNumber); + std::string scratch; + Slice record; + if (!started_) { + SequentialFile* file = NULL; + Status status = OpenLogFile(currentLogFile, &file); + if (!status.ok()) { + isValid_ = false; + currentStatus_ = status; + return; + } + assert(file != NULL); + WriteBatch batch; + log::Reader* reader = new log::Reader(file, &reporter, true, 0); + assert(reader != NULL); + while (reader->ReadRecord(&record, &scratch)) { + if (record.size() < 12) { + reporter.Corruption( + record.size(), Status::Corruption("log record too small")); + continue; + } + WriteBatchInternal::SetContents(&batch, record); + SequenceNumber currentNum = WriteBatchInternal::Sequence(&batch); + if (currentNum >= sequenceNumber_) { + isValid_ = true; + currentRecord_ = record; + currentLogReader_ = reader; + break; + } + } + if (!isValid_) { + // TODO read the entire first file. and did not find the seq number. + // Error out. + currentStatus_ = + Status::NotFound("Did not find the Seq no. in first file"); + } + started_ = true; + } else { +LOOK_NEXT_FILE: + assert(currentLogReader_ != NULL); + bool openNextFile = true; + while (currentLogReader_->ReadRecord(&record, &scratch)) { + if (record.size() < 12) { + reporter.Corruption( + record.size(), Status::Corruption("log record too small")); + continue; + } else { + currentRecord_ = record; + openNextFile = false; + break; + } + } + + if (openNextFile) { + if (currentFileIndex_ < files_->size() - 1) { + ++currentFileIndex_; + delete currentLogReader_; + SequentialFile *file; + Status status = OpenLogFile(files_->at(currentFileIndex_), &file); + if (!status.ok()) { + isValid_ = false; + currentStatus_ = status; + return; + } + currentLogReader_ = new log::Reader(file, &reporter, true, 0); + goto LOOK_NEXT_FILE; + } else { + // LOOKED AT FILES. WE ARE DONE HERE. + isValid_ = false; + currentStatus_ = Status::IOError(" NO MORE DATA LEFT"); + } + } + + } +} + +} // namespace leveldb diff --git a/db/transaction_log_iterator_impl.h b/db/transaction_log_iterator_impl.h new file mode 100644 index 0000000000..38b344f63a --- /dev/null +++ b/db/transaction_log_iterator_impl.h @@ -0,0 +1,66 @@ +// Copyright 2008-present Facebook. All Rights Reserved. +#ifndef STORAGE_LEVELDB_INCLUDE_WRITES_ITERATOR_IMPL_H_ +#define STORAGE_LEVELDB_INCLUDE_WRITES_ITERATOR_IMPL_H_ + +#include + +#include "leveldb/env.h" +#include "leveldb/options.h" +#include "leveldb/types.h" +#include "leveldb/transaction_log_iterator.h" +#include "db/log_file.h" +#include "db/log_reader.h" + +namespace leveldb { + +struct LogReporter : public log::Reader::Reporter { + Env* env; + Logger* info_log; + uint64_t log_number; + virtual void Corruption(size_t bytes, const Status& s) { + Log(info_log, "%ld: dropping %d bytes; %s", + log_number, static_cast(bytes), s.ToString().c_str()); + } +}; + +class TransactionLogIteratorImpl : public TransactionLogIterator { + public: + TransactionLogIteratorImpl(const std::string& dbname, + const Options* options, + SequenceNumber& seqNum, + std::vector* files); + virtual ~TransactionLogIteratorImpl() { + // TODO move to cc file. + if (currentLogReader_ != NULL) { + delete currentLogReader_; + } + delete files_; + } + + virtual bool Valid(); + + virtual void Next(); + + virtual Status status(); + + virtual void GetBatch(WriteBatch* batch); + + private: + const std::string& dbname_; + const Options* options_; + const uint64_t sequenceNumber_; + const std::vector* files_; + bool started_; + bool isValid_; // not valid when it starts of. + Status currentStatus_; + size_t currentFileIndex_; + Slice currentRecord_; + log::Reader* currentLogReader_; + Status OpenLogFile(const LogFile& logFile, SequentialFile** file); + LogReporter NewLogReporter(uint64_t logNumber); +}; + + + +} // namespace leveldb +#endif // STORAGE_LEVELDB_INCLUDE_WRITES_ITERATOR_IMPL_H_ diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 4423a7f318..eb37733c22 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -5,6 +5,7 @@ #ifndef STORAGE_LEVELDB_DB_WRITE_BATCH_INTERNAL_H_ #define STORAGE_LEVELDB_DB_WRITE_BATCH_INTERNAL_H_ +#include "leveldb/types.h" #include "leveldb/write_batch.h" namespace leveldb { diff --git a/include/leveldb/db.h b/include/leveldb/db.h index e4e2a30c45..fb691b49bb 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -10,6 +10,8 @@ #include #include "leveldb/iterator.h" #include "leveldb/options.h" +#include "leveldb/types.h" +#include "leveldb/transaction_log_iterator.h" namespace leveldb { @@ -180,6 +182,19 @@ class DB { virtual Status GetLiveFiles(std::vector&, uint64_t* manifest_file_size) = 0; + // Return's an iterator for all writes since the sequence number + // Status::ok if iterator is valid. + // The iterator internally holds references to the available log files. + // It automatically takes care of closing a file with no-updates left, and + // opening the next one. + // If the sequence number is non existent. it returns an iterator at a seq_no + // just greater than the requested seq_no. + // Must set WAL_ttl_seconds to a large value to use this api. + // else the WAL files will get + // cleared aggressively and the iterator might keep getting invalid before + // an update is read. + virtual Status GetUpdatesSince(SequenceNumber seq_number, + TransactionLogIterator** iter) = 0; private: // No copying allowed DB(const DB&); diff --git a/include/leveldb/transaction_log_iterator.h b/include/leveldb/transaction_log_iterator.h new file mode 100644 index 0000000000..8f46cac807 --- /dev/null +++ b/include/leveldb/transaction_log_iterator.h @@ -0,0 +1,34 @@ +// Copyright 2008-present Facebook. All Rights Reserved. +#ifndef STORAGE_LEVELDB_INCLUDE_TRANSACTION_LOG_ITERATOR_H_ +#define STORAGE_LEVELDB_INCLUDE_TRANSACTION_LOG_ITERATOR_H_ + +#include "leveldb/status.h" +#include "leveldb/write_batch.h" + +namespace leveldb { + + +// A TransactionLogIterator is used to iterate over the Transaction's in a db. +class TransactionLogIterator { + public: + TransactionLogIterator() {} + virtual ~TransactionLogIterator() {} + + // An iterator is either positioned at a WriteBatch or not valid. + // This method returns true if the iterator is valid. + virtual bool Valid() = 0; + + // Moves the iterator to the next WriteBatch. + // REQUIRES: Valid() to be true. + virtual void Next() = 0; + + // Return's ok if the iterator is in a valid stated. + // Return the Error Status when the iterator is not Valid. + virtual Status status() = 0; + + // If valid return's the current write_batch. + virtual void GetBatch(WriteBatch* batch) = 0; +}; +} // namespace leveldb + +#endif // STORAGE_LEVELDB_INCLUDE_TRANSACTION_LOG_ITERATOR_H_ diff --git a/include/leveldb/types.h b/include/leveldb/types.h new file mode 100644 index 0000000000..6ec058c2e1 --- /dev/null +++ b/include/leveldb/types.h @@ -0,0 +1,14 @@ +#ifndef STORAGE_LEVELDB_INCLUDE_TYPES_H_ +#define STORAGE_LEVELDB_INCLUDE_TYPES_H_ + +#include + +namespace leveldb { + +// Define all public custom types here. + +// Represents a sequence number in a WAL file. +typedef uint64_t SequenceNumber; + +} // namespace leveldb +#endif // STORAGE_LEVELDB_INCLUDE_TYPES_H_