diff --git a/.gitignore b/.gitignore index 23454ca877..976a6f1ba6 100644 --- a/.gitignore +++ b/.gitignore @@ -7,26 +7,27 @@ *.o *_test # specific files -aclocal.m4 -autom4te.cache/ -benchmark -config.guess -config.h -config.h.in -config.log -config.status -config.sub -configure -db_bench -depcomp -install-sh -leveldbutil -libhyperleveldb.pc -libtool -ltmain.sh -m4/ -Makefile -Makefile.in -Makefile.old -missing -stamp-h1 +/aclocal.m4 +/autom4te.cache/ +/benchmark +/config.guess +/config.h +/config.h.in +/config.log +/config.status +/config.sub +/configure +/db_bench +/depcomp +/hyperleveldb.upack +/install-sh +/leveldbutil +/libhyperleveldb.pc +/libtool +/ltmain.sh +/m4/ +/Makefile +/Makefile.in +/Makefile.old +/missing +/stamp-h1 diff --git a/AUTHORS b/AUTHORS index 6078f62e80..f1d3f3d672 100644 --- a/AUTHORS +++ b/AUTHORS @@ -7,5 +7,8 @@ Google Inc. Jeffrey Dean Sanjay Ghemawat +# Partial list of contributors: +Kevin Regan + # HyperLevelDB authors: Robert Escriva diff --git a/Makefile.am b/Makefile.am index da8dc7e75c..d65d0a65c5 100644 --- a/Makefile.am +++ b/Makefile.am @@ -26,8 +26,8 @@ ## POSSIBILITY OF SUCH DAMAGE. ACLOCAL_AMFLAGS = -I m4 ${ACLOCAL_FLAGS} -AM_CFLAGS = -DLEVELDB_PLATFORM_POSIX -fno-builtin-memcmp -fno-builtin-memmove -AM_CXXFLAGS = -DLEVELDB_PLATFORM_POSIX -fno-builtin-memcmp -fno-builtin-memmove +AM_CFLAGS = -DLEVELDB_PLATFORM_POSIX $(SNAPPY_FLAGS) -fno-builtin-memcmp -fno-builtin-memmove +AM_CXXFLAGS = -DLEVELDB_PLATFORM_POSIX $(SNAPPY_FLAGS) -fno-builtin-memcmp -fno-builtin-memmove AM_MAKEFLAGS = --no-print-directory pkgconfigdir = $(libdir)/pkgconfig @@ -35,11 +35,19 @@ pkgconfig_DATA = libhyperleveldb.pc EXTRA_DIST = EXTRA_DIST += AUTHORS +EXTRA_DIST += doc/benchmark.html +EXTRA_DIST += doc/doc.css +EXTRA_DIST += doc/impl.html +EXTRA_DIST += doc/index.html +EXTRA_DIST += doc/log_format.txt +EXTRA_DIST += doc/table_format.txt +EXTRA_DIST += helpers/memenv/memenv.cc +EXTRA_DIST += helpers/memenv/memenv_test.cc EXTRA_DIST += LICENSE EXTRA_DIST += NEWS +EXTRA_DIST += port/README EXTRA_DIST += README EXTRA_DIST += TODO -EXTRA_DIST += port/README nobase_include_HEADERS = nobase_include_HEADERS += hyperleveldb/cache.h @@ -51,6 +59,7 @@ nobase_include_HEADERS += hyperleveldb/filter_policy.h nobase_include_HEADERS += hyperleveldb/iterator.h nobase_include_HEADERS += hyperleveldb/options.h nobase_include_HEADERS += hyperleveldb/slice.h +nobase_include_HEADERS += hyperleveldb/replay_iterator.h nobase_include_HEADERS += hyperleveldb/status.h nobase_include_HEADERS += hyperleveldb/table_builder.h nobase_include_HEADERS += hyperleveldb/table.h @@ -66,6 +75,7 @@ noinst_HEADERS += db/log_reader.h noinst_HEADERS += db/log_writer.h noinst_HEADERS += db/memtable.h noinst_HEADERS += db/skiplist.h +noinst_HEADERS += db/replay_iterator.h noinst_HEADERS += db/snapshot.h noinst_HEADERS += db/table_cache.h noinst_HEADERS += db/version_edit.h @@ -110,6 +120,7 @@ libhyperleveldb_la_SOURCES += db/log_reader.cc libhyperleveldb_la_SOURCES += db/log_writer.cc libhyperleveldb_la_SOURCES += db/memtable.cc libhyperleveldb_la_SOURCES += db/repair.cc +libhyperleveldb_la_SOURCES += db/replay_iterator.cc libhyperleveldb_la_SOURCES += db/table_cache.cc libhyperleveldb_la_SOURCES += db/version_edit.cc libhyperleveldb_la_SOURCES += db/version_set.cc @@ -138,6 +149,7 @@ libhyperleveldb_la_SOURCES += util/logging.cc libhyperleveldb_la_SOURCES += util/options.cc libhyperleveldb_la_SOURCES += util/status.cc libhyperleveldb_la_SOURCES += port/port_posix.cc +libhyperleveldb_la_LIBADD = $(SNAPPY_LIBS) -lpthread libhyperleveldb_la_LDFLAGS = -pthread TESTUTIL = util/testutil.cc @@ -153,6 +165,7 @@ EXTRA_PROGRAMS += db_bench_sqlite3 EXTRA_PROGRAMS += db_bench_tree_db check_PROGRAMS = +check_PROGRAMS += autocompact_test check_PROGRAMS += arena_test check_PROGRAMS += bloom_test check_PROGRAMS += c_test @@ -172,6 +185,7 @@ check_PROGRAMS += version_edit_test check_PROGRAMS += version_set_test check_PROGRAMS += write_batch_test check_PROGRAMS += issue178_test +check_PROGRAMS += issue200_test TESTS = $(check_PROGRAMS) @@ -191,6 +205,9 @@ db_bench_tree_db_LDADD = -lkyotocabinet leveldbutil_SOURCES = db/leveldb_main.cc leveldbutil_LDADD = libhyperleveldb.la -lpthread +autocompact_test_SOURCES = db/autocompact_test.cc $(TESTHARNESS) +autocompact_test_LDADD = libhyperleveldb.la -lpthread + arena_test_SOURCES = util/arena_test.cc $(TESTHARNESS) arena_test_LDADD = libhyperleveldb.la -lpthread @@ -247,3 +264,6 @@ write_batch_test_LDADD = libhyperleveldb.la -lpthread issue178_test_SOURCES = issues/issue178_test.cc $(TESTHARNESS) issue178_test_LDADD = libhyperleveldb.la -lpthread + +issue200_test_SOURCES = issues/issue200_test.cc $(TESTHARNESS) +issue200_test_LDADD = libhyperleveldb.la -lpthread diff --git a/README b/README index 3618adeeed..1b95ba2dc9 100644 --- a/README +++ b/README @@ -49,3 +49,14 @@ include/env.h include/table.h include/table_builder.h Lower-level modules that most clients probably won't use directly + +Install +======= + +Get up and running quickly: + + $ autoreconf -i + $ ./configure + $ make + # make install + # ldconfig diff --git a/benchmark.cc b/benchmark.cc index 656371ff70..c025c3de57 100644 --- a/benchmark.cc +++ b/benchmark.cc @@ -42,7 +42,8 @@ #include // e -#include +#include +#include // armnod #include @@ -50,17 +51,9 @@ // numbers #include -static long _done = 0; -static long _number = 1000000; -static long _threads = 1; - -static struct poptOption _popts[] = { - {"number", 'n', POPT_ARG_LONG, &_number, 0, - "perform N operations against the database (default: 1000000)", "N"}, - {"threads", 't', POPT_ARG_LONG, &_threads, 0, - "run the test with T concurrent threads (default: 1)", "T"}, - POPT_TABLEEND -}; +static void +backup_thread(leveldb::DB*, + numbers::throughput_latency_logger* tll); static void worker_thread(leveldb::DB*, @@ -68,50 +61,55 @@ worker_thread(leveldb::DB*, const armnod::argparser& k, const armnod::argparser& v); +static long _done = 0; +static long _number = 1000000; +static long _threads = 1; +static long _backup = 0; +static long _write_buf = 64ULL * 1024ULL * 1024ULL; +static const char* _output = "benchmark.log"; +static const char* _dir = "."; + int main(int argc, const char* argv[]) { + e::argparser ap; + ap.autohelp(); + ap.arg().name('n', "number") + .description("perform N operations against the database (default: 1000000)") + .metavar("N") + .as_long(&_number); + ap.arg().name('t', "threads") + .description("run the test with T concurrent threads (default: 1)") + .metavar("T") + .as_long(&_threads); + ap.arg().name('o', "output") + .description("output file for benchmark results (default: benchmark.log)") + .as_string(&_output); + ap.arg().name('d', "db-dir") + .description("directory for leveldb storage (default: .)") + .as_string(&_dir); + ap.arg().name('w', "write-buffer") + .description("write buffer size (default: 64MB)") + .as_long(&_write_buf); + ap.arg().name('b', "backup") + .description("perform a live backup every N seconds (default: 0 (no backup))") + .as_long(&_backup); armnod::argparser key_parser("key-"); armnod::argparser value_parser("value-"); - std::vector popts; - poptOption s[] = {POPT_AUTOHELP {NULL, 0, POPT_ARG_INCLUDE_TABLE, _popts, 0, "Benchmark:", NULL}, POPT_TABLEEND}; - popts.push_back(s[0]); - popts.push_back(s[1]); - popts.push_back(key_parser.options("Key Generation:")); - popts.push_back(value_parser.options("Value Generation:")); - popts.push_back(s[2]); - poptContext poptcon; - poptcon = poptGetContext(NULL, argc, argv, &popts.front(), POPT_CONTEXT_POSIXMEHARDER); - e::guard g = e::makeguard(poptFreeContext, poptcon); g.use_variable(); - poptSetOtherOptionHelp(poptcon, "[OPTIONS]"); + ap.add("Key Generation:", key_parser.parser()); + ap.add("Value Generation:", value_parser.parser()); - int rc; - - while ((rc = poptGetNextOpt(poptcon)) != -1) + if (!ap.parse(argc, argv)) { - switch (rc) - { - case POPT_ERROR_NOARG: - case POPT_ERROR_BADOPT: - case POPT_ERROR_BADNUMBER: - case POPT_ERROR_OVERFLOW: - std::cerr << poptStrerror(rc) << " " << poptBadOption(poptcon, 0) << std::endl; - return EXIT_FAILURE; - case POPT_ERROR_OPTSTOODEEP: - case POPT_ERROR_BADQUOTE: - case POPT_ERROR_ERRNO: - default: - std::cerr << "logic error in argument parsing" << std::endl; - return EXIT_FAILURE; - } + return EXIT_FAILURE; } leveldb::Options opts; opts.create_if_missing = true; - opts.write_buffer_size = 64ULL * 1024ULL * 1024ULL; + opts.write_buffer_size = write_buf; opts.filter_policy = leveldb::NewBloomFilterPolicy(10); leveldb::DB* db; - leveldb::Status st = leveldb::DB::Open(opts, "tmp", &db); + leveldb::Status st = leveldb::DB::Open(opts, _dir, &db); if (!st.ok()) { @@ -121,7 +119,7 @@ main(int argc, const char* argv[]) numbers::throughput_latency_logger tll; - if (!tll.open("benchmark.log")) + if (!tll.open(_output)) { std::cerr << "could not open log: " << strerror(errno) << std::endl; return EXIT_FAILURE; @@ -130,6 +128,13 @@ main(int argc, const char* argv[]) typedef std::tr1::shared_ptr thread_ptr; std::vector threads; + if (_backup > 0) + { + thread_ptr t(new po6::threads::thread(std::tr1::bind(backup_thread, db, &tll))); + threads.push_back(t); + t->start(); + } + for (size_t i = 0; i < _threads; ++i) { thread_ptr t(new po6::threads::thread(std::tr1::bind(worker_thread, db, &tll, key_parser, value_parser))); @@ -175,6 +180,47 @@ get_random() return ret; } +#define BILLION (1000ULL * 1000ULL * 1000ULL) + +void +backup_thread(leveldb::DB* db, + numbers::throughput_latency_logger* tll) +{ + uint64_t target = e::time() / BILLION; + target += _backup; + uint64_t idx = 0; + numbers::throughput_latency_logger::thread_state ts; + tll->initialize_thread(&ts); + + while (__sync_fetch_and_add(&_done, 0) < _number) + { + uint64_t now = e::time() / BILLION; + + if (now < target) + { + timespec ts; + ts.tv_sec = 0; + ts.tv_nsec = 250ULL * 1000ULL * 1000ULL; + nanosleep(&ts, NULL); + } + else + { + target = now + _backup; + char buf[32]; + snprintf(buf, 32, "%05lu", idx); + buf[31] = '\0'; + leveldb::Slice name(buf); + leveldb::Status st; + + tll->start(&ts, 4); + st = db->LiveBackup(name); + tll->finish(&ts); + assert(st.ok()); + ++idx; + } + } +} + void worker_thread(leveldb::DB* db, numbers::throughput_latency_logger* tll, @@ -196,13 +242,15 @@ worker_thread(leveldb::DB* db, // issue a "get" std::string tmp; leveldb::ReadOptions ropts; + tll->start(&ts, 1); leveldb::Status rst = db->Get(ropts, leveldb::Slice(k.data(), k.size()), &tmp); + tll->finish(&ts); assert(rst.ok() || rst.IsNotFound()); // issue a "put" leveldb::WriteOptions wopts; wopts.sync = false; - tll->start(&ts, 0); + tll->start(&ts, 2); leveldb::Status wst = db->Put(wopts, leveldb::Slice(k.data(), k.size()), leveldb::Slice(v.data(), v.size())); tll->finish(&ts); assert(wst.ok()); diff --git a/configure.ac b/configure.ac index 744538a8d9..dd3c756706 100644 --- a/configure.ac +++ b/configure.ac @@ -30,7 +30,7 @@ # Process this file with autoconf to produce a configure script. AC_PREREQ([2.62]) -AC_INIT([hyperleveldb], [2.0.dev], [robert@hyperdex.org]) +AC_INIT([hyperleveldb], [1.0.dev], [robert@hyperdex.org]) AM_INIT_AUTOMAKE([foreign subdir-objects dist-bzip2]) m4_ifdef([AM_SILENT_RULES], [AM_SILENT_RULES([yes])]) LT_PREREQ([2.2]) @@ -58,6 +58,40 @@ AC_FUNC_MMAP AC_CHECK_FUNCS([alarm clock_gettime mach_absolute_time ftruncate memmove mkdir munmap rmdir socket]) # Optional components +snappy_detect_hdr=yes +snappy_detect_lib=yes +AC_CHECK_LIB([snappy], [snappy_compress], [], [snappy_detect_hdr=no]) +AC_CHECK_HEADER([snappy.h],,[snappy_detect_lib=no]) +AC_ARG_ENABLE([snappy], [AS_HELP_STRING([--enable-snappy], + [build with Snappy @<:@default: auto@:>@])], + [snappy=${enableval}], [snappy=no]) +if test x"${snappy}" = xyes; then + if test x"${snappy_detect_hdr}" != xyes; then + AC_MSG_ERROR([ +------------------------------------------------- +LevelDB configured with the Snappy library. +libsnappy.so not found +Please install Snappy to continue. +-------------------------------------------------]) + fi + if test x"${snappy_detect_lib}" != xyes; then + AC_MSG_ERROR([ +------------------------------------------------- +LevelDB configured with the Snappy library. +snappy.h not found +Please install Snappy to continue. +-------------------------------------------------]) + fi +fi +if test x"${snappy_detect_hdr}" = xyes -a x"${snappy_detect_lib}" = xyes; then +SNAPPY_FLAGS=-DSNAPPY +SNAPPY_LIBS=-lsnappy +else +SNAPPY_FLAGS= +SNAPPY_LIBS= +fi +AC_SUBST(SNAPPY_FLAGS) +AC_SUBST(SNAPPY_LIBS) -AC_CONFIG_FILES([Makefile libhyperleveldb.pc]) +AC_CONFIG_FILES([Makefile libhyperleveldb.pc hyperleveldb.upack]) AC_OUTPUT diff --git a/db/autocompact_test.cc b/db/autocompact_test.cc new file mode 100644 index 0000000000..acf3c7d789 --- /dev/null +++ b/db/autocompact_test.cc @@ -0,0 +1,123 @@ +// Copyright (c) 2013 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "hyperleveldb/db.h" +#include "db/db_impl.h" +#include "hyperleveldb/cache.h" +#include "util/testharness.h" +#include "util/testutil.h" + +namespace leveldb { + +class AutoCompactTest { + public: + std::string dbname_; + Cache* tiny_cache_; + Options options_; + DB* db_; + + AutoCompactTest() { + dbname_ = test::TmpDir() + "/autocompact_test"; + tiny_cache_ = NewLRUCache(100); + options_.block_cache = tiny_cache_; + DestroyDB(dbname_, options_); + options_.create_if_missing = true; + options_.compression = kNoCompression; + ASSERT_OK(DB::Open(options_, dbname_, &db_)); + } + + ~AutoCompactTest() { + delete db_; + DestroyDB(dbname_, Options()); + delete tiny_cache_; + } + + std::string Key(int i) { + char buf[100]; + snprintf(buf, sizeof(buf), "key%06d", i); + return std::string(buf); + } + + uint64_t Size(const Slice& start, const Slice& limit) { + Range r(start, limit); + uint64_t size; + db_->GetApproximateSizes(&r, 1, &size); + return size; + } + + void DoReads(int n); +}; + +static const int kValueSize = 200 * 1024; +static const int kTotalSize = 100 * 1024 * 1024; +static const int kCount = kTotalSize / kValueSize; + +// Read through the first n keys repeatedly and check that they get +// compacted (verified by checking the size of the key space). +void AutoCompactTest::DoReads(int n) { + std::string value(kValueSize, 'x'); + DBImpl* dbi = reinterpret_cast(db_); + + // Fill database + for (int i = 0; i < kCount; i++) { + ASSERT_OK(db_->Put(WriteOptions(), Key(i), value)); + } + ASSERT_OK(dbi->TEST_CompactMemTable()); + + // Delete everything + for (int i = 0; i < kCount; i++) { + ASSERT_OK(db_->Delete(WriteOptions(), Key(i))); + } + ASSERT_OK(dbi->TEST_CompactMemTable()); + + // Get initial measurement of the space we will be reading. + const int64_t initial_size = Size(Key(0), Key(n)); + const int64_t initial_other_size = Size(Key(n), Key(kCount)); + + // Read until size drops significantly. + std::string limit_key = Key(n); + for (int read = 0; true; read++) { + ASSERT_LT(read, 100) << "Taking too long to compact"; + Iterator* iter = db_->NewIterator(ReadOptions()); + for (iter->SeekToFirst(); + iter->Valid() && iter->key().ToString() < limit_key; + iter->Next()) { + // Drop data + } + delete iter; + // Wait a little bit to allow any triggered compactions to complete. + Env::Default()->SleepForMicroseconds(1000000); + uint64_t size = Size(Key(0), Key(n)); + fprintf(stderr, "iter %3d => %7.3f MB [other %7.3f MB]\n", + read+1, size/1048576.0, Size(Key(n), Key(kCount))/1048576.0); + if (size <= initial_size/10) { + break; + } + } + + // Verify that the size of the key space not touched by the reads + // is pretty much unchanged. + const int64_t final_other_size = Size(Key(n), Key(kCount)); + ASSERT_LE(final_other_size, initial_other_size + 1048576); + ASSERT_GE(final_other_size, initial_other_size/5 - 1048576); +} + +TEST(AutoCompactTest, ReadAll) { + DoReads(kCount); +} + +// HyperLevelDB's ratio-driven compactions always compact everything here. The +// reads trigger the compaction, but then the system decides it is more +// effiicient to just collect everything, emptying the db completely. +#if 0 +TEST(AutoCompactTest, ReadHalf) { + DoReads(kCount/2); +} +#endif + +} // namespace leveldb + +int main(int argc, char** argv) { + return leveldb::test::RunAllTests(); +} diff --git a/db/corruption_test.cc b/db/corruption_test.cc index 1c80f46b14..1f44179c11 100644 --- a/db/corruption_test.cc +++ b/db/corruption_test.cc @@ -35,6 +35,7 @@ class CorruptionTest { CorruptionTest() { tiny_cache_ = NewLRUCache(100); options_.env = &env_; + options_.block_cache = tiny_cache_; dbname_ = test::TmpDir() + "/db_test"; DestroyDB(dbname_, options_); @@ -50,17 +51,14 @@ class CorruptionTest { delete tiny_cache_; } - Status TryReopen(Options* options = NULL) { + Status TryReopen() { delete db_; db_ = NULL; - Options opt = (options ? *options : options_); - opt.env = &env_; - opt.block_cache = tiny_cache_; - return DB::Open(opt, dbname_, &db_); + return DB::Open(options_, dbname_, &db_); } - void Reopen(Options* options = NULL) { - ASSERT_OK(TryReopen(options)); + void Reopen() { + ASSERT_OK(TryReopen()); } void RepairDB() { @@ -92,6 +90,10 @@ class CorruptionTest { for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { uint64_t key; Slice in(iter->key()); + if (in == "" || in == "~") { + // Ignore boundary keys. + continue; + } if (!ConsumeDecimalNumber(&in, &key) || !in.empty() || key < next_expected) { @@ -233,7 +235,7 @@ TEST(CorruptionTest, TableFile) { dbi->TEST_CompactRange(1, NULL, NULL); Corrupt(kTableFile, 100, 1); - Check(99, 99); + Check(90, 99); } TEST(CorruptionTest, TableFileIndexData) { @@ -299,7 +301,7 @@ TEST(CorruptionTest, CompactionInputError) { ASSERT_EQ(1, Property("leveldb.num-files-at-level" + NumberToString(last))); Corrupt(kTableFile, 100, 1); - Check(9, 9); + Check(5, 9); // Force compactions by writing lots of values Build(10000); @@ -307,33 +309,23 @@ TEST(CorruptionTest, CompactionInputError) { } TEST(CorruptionTest, CompactionInputErrorParanoid) { - Options options; - options.paranoid_checks = true; - options.write_buffer_size = 1048576; - Reopen(&options); + options_.paranoid_checks = true; + options_.write_buffer_size = 512 << 10; + Reopen(); DBImpl* dbi = reinterpret_cast(db_); - // Fill levels >= 1 so memtable compaction outputs to level 1 - for (int level = 1; level < config::kNumLevels; level++) { - dbi->Put(WriteOptions(), "", "begin"); - dbi->Put(WriteOptions(), "~", "end"); + // Make multiple inputs so we need to compact. + for (int i = 0; i < 2; i++) { + Build(10); dbi->TEST_CompactMemTable(); + Corrupt(kTableFile, 100, 1); + env_.SleepForMicroseconds(100000); } + dbi->CompactRange(NULL, NULL); - Build(10); - dbi->TEST_CompactMemTable(); - env_.SleepForMicroseconds(1000000); - ASSERT_EQ(1, Property("leveldb.num-files-at-level0")); - - Corrupt(kTableFile, 100, 1); - Check(9, 9); - - // Write must eventually fail because of corrupted table - Status s; + // Write must fail because of corrupted table std::string tmp1, tmp2; - for (int i = 0; i < 1000000 && s.ok(); i++) { - s = db_->Put(WriteOptions(), Key(i, &tmp1), Value(i, &tmp2)); - } + Status s = db_->Put(WriteOptions(), Key(5, &tmp1), Value(5, &tmp2)); ASSERT_TRUE(!s.ok()) << "write did not fail in corrupted paranoid db"; } diff --git a/db/db_impl.cc b/db/db_impl.cc index 613cba295c..84fff83c4f 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -5,11 +5,13 @@ #include "db_impl.h" #include +#include #include #include #include #include #include + #include "builder.h" #include "db_iter.h" #include "dbformat.h" @@ -17,11 +19,13 @@ #include "log_reader.h" #include "log_writer.h" #include "memtable.h" +#include "replay_iterator.h" #include "table_cache.h" #include "version_set.h" #include "write_batch_internal.h" #include "../hyperleveldb/db.h" #include "../hyperleveldb/env.h" +#include "../hyperleveldb/replay_iterator.h" #include "../hyperleveldb/status.h" #include "../hyperleveldb/table.h" #include "../hyperleveldb/table_builder.h" @@ -35,6 +39,8 @@ namespace hyperleveldb { +const int kStraightReads = 50; + const int kNumNonTableCacheFiles = 10; struct DBImpl::CompactionState { @@ -102,22 +108,23 @@ Options SanitizeOptions(const std::string& dbname, return result; } -DBImpl::DBImpl(const Options& options, const std::string& dbname) - : env_(options.env), - internal_comparator_(options.comparator), - internal_filter_policy_(options.filter_policy), - options_(SanitizeOptions( - dbname, &internal_comparator_, &internal_filter_policy_, options)), - owns_info_log_(options_.info_log != options.info_log), - owns_cache_(options_.block_cache != options.block_cache), +DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) + : env_(raw_options.env), + internal_comparator_(raw_options.comparator), + internal_filter_policy_(raw_options.filter_policy), + options_(SanitizeOptions(dbname, &internal_comparator_, + &internal_filter_policy_, raw_options)), + owns_info_log_(options_.info_log != raw_options.info_log), + owns_cache_(options_.block_cache != raw_options.block_cache), dbname_(dbname), db_lock_(NULL), shutting_down_(NULL), mem_(new MemTable(internal_comparator_)), imm_(NULL), - logfile_(NULL), + logfile_(), logfile_number_(0), - log_(NULL), + log_(), + seed_(0), writers_lower_(0), writers_upper_(0), bg_fg_cv_(&mutex_), @@ -130,17 +137,24 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) bg_log_cv_(&mutex_), bg_log_occupied_(false), manual_compaction_(NULL), + manual_garbage_cutoff_(raw_options.manual_garbage_collection ? + SequenceNumber(0) : kMaxSequenceNumber), + straight_reads_(0), + backup_cv_(&mutex_), + backup_in_progress_(), + backup_deferred_delete_(), consecutive_compaction_errors_(0) { mutex_.Lock(); mem_->Ref(); has_imm_.Release_Store(NULL); + backup_in_progress_.Release_Store(NULL); env_->StartThread(&DBImpl::CompactMemTableWrapper, this); env_->StartThread(&DBImpl::CompactOptimisticWrapper, this); env_->StartThread(&DBImpl::CompactLevelWrapper, this); num_bg_threads_ = 3; // Reserve ten files or so for other uses and give the rest to TableCache. - const int table_cache_size = options.max_open_files - kNumNonTableCacheFiles; + const int table_cache_size = options_.max_open_files - kNumNonTableCacheFiles; table_cache_ = new TableCache(dbname_, &options_, table_cache_size); versions_ = new VersionSet(dbname_, &options_, table_cache_, &internal_comparator_); @@ -170,8 +184,8 @@ DBImpl::~DBImpl() { delete versions_; if (mem_ != NULL) mem_->Unref(); if (imm_ != NULL) imm_->Unref(); - delete log_; - delete logfile_; + log_.reset(); + logfile_.reset(); delete table_cache_; if (owns_info_log_) { @@ -224,6 +238,16 @@ void DBImpl::MaybeIgnoreError(Status* s) const { } void DBImpl::DeleteObsoleteFiles() { + // Defer if there's background activity + mutex_.AssertHeld(); + if (backup_in_progress_.Acquire_Load() != NULL) { + backup_deferred_delete_ = true; + return; + } + + // If you ever release mutex_ in this function, you'll need to do more work in + // LiveBackup + // Make a set of all of the live files std::set live = pending_outputs_; versions_->AddLiveFiles(&live); @@ -651,7 +675,7 @@ void DBImpl::CompactLevelThread() { while (!shutting_down_.Acquire_Load()) { while (!shutting_down_.Acquire_Load() && manual_compaction_ == NULL && - !versions_->NeedsCompaction(levels_locked_)) { + !versions_->NeedsCompaction(levels_locked_, straight_reads_ > kStraightReads)) { bg_compaction_cv_.Wait(); } if (shutting_down_.Acquire_Load()) { @@ -709,9 +733,9 @@ Status DBImpl::BackgroundCompaction() { (m->end ? m->end->DebugString().c_str() : "(end)"), (m->done ? "(end)" : manual_end.DebugString().c_str())); } else { - int level = versions_->PickCompactionLevel(levels_locked_); + int level = versions_->PickCompactionLevel(levels_locked_, straight_reads_ > kStraightReads); if (level != config::kNumLevels) { - c = versions_->PickCompaction(level); + c = versions_->PickCompaction(versions_->current(), level); } if (c) { assert(!levels_locked_[c->level() + 0]); @@ -831,7 +855,7 @@ Status DBImpl::OptimisticCompaction() { if (levels_locked_[level] || levels_locked_[level + 1]) { continue; } - Compaction* tmp = versions_->PickCompaction(level); + Compaction* tmp = versions_->PickCompaction(versions_->current(), level); if (tmp && tmp->IsTrivialMove()) { if (c) { delete c; @@ -1055,6 +1079,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { std::string current_user_key; bool has_current_user_key = false; SequenceNumber last_sequence_for_key = kMaxSequenceNumber; + uint64_t i = 0; for (; input->Valid() && !shutting_down_.Acquire_Load(); ) { Slice key = input->key(); // Handle key/value, add to state, etc. @@ -1074,6 +1099,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { last_sequence_for_key = kMaxSequenceNumber; } + // Just remember that last_sequence_for_key is decreasing over time, and + // all of this makes sense. + if (last_sequence_for_key <= compact->smallest_snapshot) { // Hidden by an newer entry for same user key drop = true; // (A) @@ -1090,6 +1118,15 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { drop = true; } + // If we're going to drop this key, and there was no previous version of + // this key, and it was written at or after the garbage cutoff, we keep + // it. + if (drop && + last_sequence_for_key == kMaxSequenceNumber && + ikey.sequence >= manual_garbage_cutoff_) { + drop = false; + } + last_sequence_for_key = ikey.sequence; } @@ -1174,10 +1211,14 @@ static void CleanupIteratorState(void* arg1, void* arg2) { } } // namespace -Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, - SequenceNumber* latest_snapshot) { +Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, uint64_t number, + SequenceNumber* latest_snapshot, + uint32_t* seed, bool external_sync) { IterState* cleanup = new IterState; - mutex_.Lock(); + if (!external_sync) { + mutex_.Lock(); + } + ++straight_reads_; *latest_snapshot = versions_->LastSequence(); // Collect together all needed child iterators @@ -1188,7 +1229,7 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, list.push_back(imm_->NewIterator()); imm_->Ref(); } - versions_->current()->AddIterators(options, &list); + versions_->current()->AddSomeIterators(options, number, &list); Iterator* internal_iter = NewMergingIterator(&internal_comparator_, &list[0], list.size()); versions_->current()->Ref(); @@ -1199,13 +1240,17 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, cleanup->version = versions_->current(); internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL); - mutex_.Unlock(); + *seed = ++seed_; + if (!external_sync) { + mutex_.Unlock(); + } return internal_iter; } Iterator* DBImpl::TEST_NewInternalIterator() { SequenceNumber ignored; - return NewInternalIterator(ReadOptions(), &ignored); + uint32_t ignored_seed; + return NewInternalIterator(ReadOptions(), 0, &ignored, &ignored_seed, false); } int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() { @@ -1251,6 +1296,10 @@ Status DBImpl::Get(const ReadOptions& options, mutex_.Lock(); } + if (have_stat_update && current->UpdateStats(stats)) { + bg_compaction_cv_.Signal(); + } + ++straight_reads_; mem->Unref(); if (imm != NULL) imm->Unref(); current->Unref(); @@ -1259,12 +1308,157 @@ Status DBImpl::Get(const ReadOptions& options, Iterator* DBImpl::NewIterator(const ReadOptions& options) { SequenceNumber latest_snapshot; - Iterator* internal_iter = NewInternalIterator(options, &latest_snapshot); + uint32_t seed; + Iterator* iter = NewInternalIterator(options, 0, &latest_snapshot, &seed, false); return NewDBIterator( - &dbname_, env_, user_comparator(), internal_iter, + this, user_comparator(), iter, (options.snapshot != NULL ? reinterpret_cast(options.snapshot)->number_ - : latest_snapshot)); + : latest_snapshot), + seed); +} + +void DBImpl::GetReplayTimestamp(std::string* timestamp) { + uint64_t file = 0; + uint64_t seqno = 0; + + { + MutexLock l(&mutex_); + file = versions_->NewFileNumber(); + versions_->ReuseFileNumber(file); + seqno = versions_->LastSequence(); + } + + timestamp->clear(); + PutVarint64(timestamp, file); + PutVarint64(timestamp, seqno); +} + +void DBImpl::AllowGarbageCollectBeforeTimestamp(const std::string& timestamp) { + Slice ts_slice(timestamp); + uint64_t file = 0; + uint64_t seqno = 0; + + if (timestamp == "all") { + // keep zeroes + } else if (timestamp == "now") { + MutexLock l(&mutex_); + seqno = versions_->LastSequence(); + if (manual_garbage_cutoff_ < seqno) { + manual_garbage_cutoff_ = seqno; + } + } else if (GetVarint64(&ts_slice, &file) && + GetVarint64(&ts_slice, &seqno)) { + MutexLock l(&mutex_); + if (manual_garbage_cutoff_ < seqno) { + manual_garbage_cutoff_ = seqno; + } + } +} + +bool DBImpl::ValidateTimestamp(const std::string& ts) { + uint64_t file = 0; + uint64_t seqno = 0; + Slice ts_slice(ts); + return ts == "all" || ts == "now" || + (GetVarint64(&ts_slice, &file) && + GetVarint64(&ts_slice, &seqno)); +} + +int DBImpl::CompareTimestamps(const std::string& lhs, const std::string& rhs) { + uint64_t now = 0; + uint64_t lhs_seqno = 0; + uint64_t rhs_seqno = 0; + uint64_t tmp; + if (lhs == "now" || rhs == "now") { + MutexLock l(&mutex_); + now = versions_->LastSequence(); + } + if (lhs == "all") { + lhs_seqno = 0; + } else if (lhs == "now") { + lhs_seqno = now; + } else { + Slice lhs_slice(lhs); + GetVarint64(&lhs_slice, &tmp); + GetVarint64(&lhs_slice, &lhs_seqno); + } + if (rhs == "all") { + rhs_seqno = 0; + } else if (rhs == "now") { + rhs_seqno = now; + } else { + Slice rhs_slice(rhs); + GetVarint64(&rhs_slice, &tmp); + GetVarint64(&rhs_slice, &rhs_seqno); + } + + if (lhs_seqno < rhs_seqno) { + return -1; + } else if (lhs_seqno > rhs_seqno) { + return 1; + } else { + return 0; + } +} + +Status DBImpl::GetReplayIterator(const std::string& timestamp, + ReplayIterator** iter) { + *iter = NULL; + Slice ts_slice(timestamp); + uint64_t file = 0; + uint64_t seqno = 0; + + if (timestamp == "all") { + seqno = 0; + } else if (timestamp == "now") { + MutexLock l(&mutex_); + file = versions_->NewFileNumber(); + versions_->ReuseFileNumber(file); + seqno = versions_->LastSequence(); + } else if (!GetVarint64(&ts_slice, &file) || + !GetVarint64(&ts_slice, &seqno)) { + return Status::InvalidArgument("Timestamp is not valid"); + } + + ReadOptions options; + SequenceNumber latest_snapshot; + uint32_t seed; + MutexLock l(&mutex_); + Iterator* internal_iter = NewInternalIterator(options, file, &latest_snapshot, &seed, true); + internal_iter->SeekToFirst(); + ReplayIteratorImpl* iterimpl; + iterimpl = new ReplayIteratorImpl( + this, &mutex_, user_comparator(), internal_iter, mem_, SequenceNumber(seqno)); + *iter = iterimpl; + replay_iters_.push_back(iterimpl); + return Status::OK(); +} + +void DBImpl::ReleaseReplayIterator(ReplayIterator* _iter) { + MutexLock l(&mutex_); + ReplayIteratorImpl* iter = reinterpret_cast(_iter); + for (std::list::iterator it = replay_iters_.begin(); + it != replay_iters_.end(); ++it) { + if (*it == iter) { + iter->cleanup(); // calls delete + replay_iters_.erase(it); + return; + } + } +} + +void DBImpl::RecordReadSample(Slice key) { + MutexLock l(&mutex_); + ++straight_reads_; + if (versions_->current()->RecordReadSample(key)) { + bg_compaction_cv_.Signal(); + } +} + +SequenceNumber DBImpl::LastSequence() { + MutexLock l(&mutex_); + return versions_->LastSequence(); } const Snapshot* DBImpl::GetSnapshot() { @@ -1294,11 +1488,11 @@ struct DBImpl::Writer { Writer* next; uint64_t start_sequence; uint64_t end_sequence; - WritableFile* logfile; - log::Writer* log; + std::tr1::shared_ptr logfile; + std::tr1::shared_ptr log; MemTable* mem; - WritableFile* old_logfile; - log::Writer* old_log; + std::tr1::shared_ptr old_logfile; + std::tr1::shared_ptr old_log; explicit Writer() : mtx(), @@ -1307,11 +1501,11 @@ struct DBImpl::Writer { next(NULL), start_sequence(0), end_sequence(0), - logfile(NULL), - log(NULL), + logfile(), + log(), mem(NULL), - old_logfile(NULL), - old_log(NULL) { + old_logfile(), + old_log() { } ~Writer() throw () { } @@ -1345,10 +1539,12 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { Status DBImpl::SequenceWriteBegin(Writer* w, WriteBatch* updates) { Status s; MutexLock l(&mutex_); + straight_reads_ = 0; bool force = updates == NULL; bool allow_delay = !force; - w->old_log = NULL; - w->old_logfile = NULL; + bool enqueue_mem = false; + w->old_log.reset(); + w->old_logfile.reset(); while (true) { if (!bg_error_.ok()) { @@ -1380,14 +1576,15 @@ Status DBImpl::SequenceWriteBegin(Writer* w, WriteBatch* updates) { } w->old_log = log_; w->old_logfile = logfile_; - logfile_ = lfile; + logfile_.reset(lfile); logfile_number_ = new_log_number; - log_ = new log::Writer(lfile); + log_.reset(new log::Writer(lfile)); imm_ = mem_; has_imm_.Release_Store(imm_); mem_ = new MemTable(internal_comparator_); mem_->Ref(); force = false; // Do not force another compaction if have room + enqueue_mem = true; break; } } @@ -1405,6 +1602,13 @@ Status DBImpl::SequenceWriteBegin(Writer* w, WriteBatch* updates) { w->mem->Ref(); } + if (enqueue_mem) { + for (std::list::iterator it = replay_iters_.begin(); + it != replay_iters_.end(); ++it) { + (*it)->enqueue(mem_, w->start_sequence); + } + } + return s; } @@ -1429,8 +1633,8 @@ void DBImpl::SequenceWriteEnd(Writer* w) { // must do in order: log, logfile if (w->old_log) { assert(w->old_logfile); - delete w->old_log; - delete w->old_logfile; + w->old_log.reset(); + w->old_logfile.reset(); bg_memtable_cv_.Signal(); } @@ -1520,6 +1724,107 @@ void DBImpl::GetApproximateSizes( } } +Status DBImpl::LiveBackup(const Slice& _name) { + Slice name = _name; + size_t name_sz = 0; + + for (; name_sz < name.size() && name.data()[name_sz] != '\0'; ++name_sz) + ; + + name = Slice(name.data(), name_sz); + std::set live; + uint64_t ticket = __sync_add_and_fetch(&writers_upper_, 1); + + while (__sync_fetch_and_add(&writers_lower_, 0) < ticket) + ; + + { + MutexLock l(&mutex_); + versions_->SetLastSequence(ticket); + while (backup_in_progress_.Acquire_Load() != NULL) { + backup_cv_.Wait(); + } + backup_in_progress_.Release_Store(this); + while (bg_log_occupied_) { + bg_log_cv_.Wait(); + } + bg_log_occupied_ = true; + // note that this logic assumes that DeleteObsoleteFiles never releases + // mutex_, so that once we release at this brace, we'll guarantee that it + // will see backup_in_progress_. If you change DeleteObsoleteFiles to + // release mutex_, you'll need to add some sort of synchronization in place + // of this text block. + versions_->AddLiveFiles(&live); + __sync_fetch_and_add(&writers_lower_, 1); + } + + Status s; + std::vector filenames; + s = env_->GetChildren(dbname_, &filenames); + std::string backup_dir = dbname_ + "/backup-" + name.ToString() + "/"; + + if (s.ok()) { + s = env_->CreateDir(backup_dir); + } + + uint64_t number; + FileType type; + + for (size_t i = 0; i < filenames.size(); i++) { + if (!s.ok()) { + continue; + } + if (ParseFileName(filenames[i], &number, &type)) { + std::string src = dbname_ + "/" + filenames[i]; + std::string target = backup_dir + "/" + filenames[i]; + switch (type) { + case kLogFile: + case kDescriptorFile: + case kCurrentFile: + case kInfoLogFile: + s = env_->CopyFile(src, target); + break; + case kTableFile: + // If it's a file referenced by a version, we have logged that version + // and applied it. Our MANIFEST will reflect that, and the file + // number assigned to new files will be greater or equal, ensuring + // that they aren't overwritten. Any file not in "live" either exists + // past the current manifest (output of ongoing compaction) or so far + // in the past we don't care (we're going to delete it at the end of + // this backup). I'd rather play safe than sorry. + // + // Under no circumstances should you collapse this to a single + // LinkFile without the conditional as it has implications for backups + // that share hardlinks. Opening an older backup that has files + // hardlinked with newer backups will overwrite "immutable" files in + // the newer backups because they aren't in our manifest, and we do an + // open/write rather than a creat/rename. We avoid linking these + // files. + if (live.find(number) != live.end()) { + s = env_->LinkFile(src, target); + } + break; + case kTempFile: + case kDBLockFile: + break; + } + } + } + + { + MutexLock l(&mutex_); + backup_in_progress_.Release_Store(NULL); + if (s.ok() && backup_deferred_delete_) { + DeleteObsoleteFiles(); + } + backup_deferred_delete_ = false; + bg_log_occupied_ = false; + bg_log_cv_.Signal(); + backup_cv_.Signal(); + } + return s; +} + // Default implementations of convenience methods that subclasses of DB // can call if they wish Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { @@ -1551,9 +1856,9 @@ Status DB::Open(const Options& options, const std::string& dbname, &lfile); if (s.ok()) { edit.SetLogNumber(new_log_number); - impl->logfile_ = lfile; + impl->logfile_.reset(lfile); impl->logfile_number_ = new_log_number; - impl->log_ = new log::Writer(lfile); + impl->log_.reset(new log::Writer(lfile)); s = impl->versions_->LogAndApply(&edit, &impl->mutex_, &impl->bg_log_cv_, &impl->bg_log_occupied_); } if (s.ok()) { diff --git a/db/db_impl.h b/db/db_impl.h index 49eb9b1f27..6ab4e086e2 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -6,10 +6,14 @@ #define STORAGE_HYPERLEVELDB_DB_DB_IMPL_H_ #include +#include #include +#include + #include "dbformat.h" #include "log_writer.h" #include "snapshot.h" +#include "replay_iterator.h" #include "../hyperleveldb/db.h" #include "../hyperleveldb/env.h" #include "../port/port.h" @@ -36,11 +40,19 @@ class DBImpl : public DB { const Slice& key, std::string* value); virtual Iterator* NewIterator(const ReadOptions&); + virtual void GetReplayTimestamp(std::string* timestamp); + virtual void AllowGarbageCollectBeforeTimestamp(const std::string& timestamp); + virtual bool ValidateTimestamp(const std::string& timestamp); + virtual int CompareTimestamps(const std::string& lhs, const std::string& rhs); + virtual Status GetReplayIterator(const std::string& timestamp, + ReplayIterator** iter); + virtual void ReleaseReplayIterator(ReplayIterator* iter); virtual const Snapshot* GetSnapshot(); virtual void ReleaseSnapshot(const Snapshot* snapshot); virtual bool GetProperty(const Slice& property, std::string* value); virtual void GetApproximateSizes(const Range* range, int n, uint64_t* sizes); virtual void CompactRange(const Slice* begin, const Slice* end); + virtual Status LiveBackup(const Slice& name); // Extra methods (for testing) that are not in the public DB interface @@ -59,13 +71,23 @@ class DBImpl : public DB { // file at a level >= 1. int64_t TEST_MaxNextLevelOverlappingBytes(); + // Record a sample of bytes read at the specified internal key. + // Samples are taken approximately once every config::kReadBytesPeriod + // bytes. + void RecordReadSample(Slice key); + + // Peek at the last sequence; + // REQURES: mutex_ not held + SequenceNumber LastSequence(); + private: friend class DB; struct CompactionState; struct Writer; - Iterator* NewInternalIterator(const ReadOptions&, - SequenceNumber* latest_snapshot); + Iterator* NewInternalIterator(const ReadOptions&, uint64_t number, + SequenceNumber* latest_snapshot, + uint32_t* seed, bool external_sync); Status NewDB(); @@ -139,9 +161,10 @@ class DBImpl : public DB { MemTable* mem_; MemTable* imm_; // Memtable being compacted port::AtomicPointer has_imm_; // So bg thread can detect non-NULL imm_ - WritableFile* logfile_; + std::tr1::shared_ptr logfile_; uint64_t logfile_number_; - log::Writer* log_; + std::tr1::shared_ptr log_; + uint32_t seed_; // For sampling. // Synchronize writers uint64_t __attribute__ ((aligned (8))) writers_lower_; @@ -179,8 +202,22 @@ class DBImpl : public DB { }; ManualCompaction* manual_compaction_; + // Where have we pinned tombstones? + SequenceNumber manual_garbage_cutoff_; + + // replay iterators + std::list replay_iters_; + + // how many reads have we done in a row, uninterrupted by writes + uint64_t straight_reads_; + VersionSet* versions_; + // Information for ongoing backup processes + port::CondVar backup_cv_; + port::AtomicPointer backup_in_progress_; // non-NULL in progress + bool backup_deferred_delete_; // DeleteObsoleteFiles delayed by backup; protect with mutex_ + // Have we encountered a background error in paranoid mode? Status bg_error_; int consecutive_compaction_errors_; diff --git a/db/db_iter.cc b/db/db_iter.cc index 3816e185d8..e1d940fcf9 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -5,12 +5,14 @@ #include "db_iter.h" #include "filename.h" +#include "db_impl.h" #include "dbformat.h" #include "../hyperleveldb/env.h" #include "../hyperleveldb/iterator.h" #include "../port/port.h" #include "../util/logging.h" #include "../util/mutexlock.h" +#include "../util/random.h" namespace hyperleveldb { @@ -46,15 +48,16 @@ class DBIter: public Iterator { kReverse }; - DBIter(const std::string* dbname, Env* env, - const Comparator* cmp, Iterator* iter, SequenceNumber s) - : dbname_(dbname), - env_(env), + DBIter(DBImpl* db, const Comparator* cmp, Iterator* iter, SequenceNumber s, + uint32_t seed) + : db_(db), user_comparator_(cmp), iter_(iter), sequence_(s), direction_(kForward), - valid_(false) { + valid_(false), + rnd_(seed), + bytes_counter_(RandomPeriod()) { } virtual ~DBIter() { delete iter_; @@ -100,8 +103,12 @@ class DBIter: public Iterator { } } - const std::string* const dbname_; - Env* const env_; + // Pick next gap with average value of config::kReadBytesPeriod. + ssize_t RandomPeriod() { + return rnd_.Uniform(2*config::kReadBytesPeriod); + } + + DBImpl* db_; const Comparator* const user_comparator_; Iterator* const iter_; SequenceNumber const sequence_; @@ -112,13 +119,23 @@ class DBIter: public Iterator { Direction direction_; bool valid_; + Random rnd_; + ssize_t bytes_counter_; + // No copying allowed DBIter(const DBIter&); void operator=(const DBIter&); }; inline bool DBIter::ParseKey(ParsedInternalKey* ikey) { - if (!ParseInternalKey(iter_->key(), ikey)) { + Slice k = iter_->key(); + ssize_t n = k.size() + iter_->value().size(); + bytes_counter_ -= n; + while (bytes_counter_ < 0) { + bytes_counter_ += RandomPeriod(); + db_->RecordReadSample(k); + } + if (!ParseInternalKey(k, ikey)) { status_ = Status::Corruption("corrupted internal key in DBIter"); return false; } else { @@ -129,6 +146,9 @@ inline bool DBIter::ParseKey(ParsedInternalKey* ikey) { void DBIter::Next() { assert(valid_); + // Temporarily use saved_key_ as storage for key to skip. + std::string* skip = &saved_key_; + if (direction_ == kReverse) { // Switch directions? direction_ = kForward; // iter_ is pointing just before the entries for this->key(), @@ -144,11 +164,13 @@ void DBIter::Next() { saved_key_.clear(); return; } + } else { + // if we did not reverse direction, save the current key as the one which we + // must advance past. If we did reverse direction, it's already been set, + // so we don't need to do it. This conditional solves upstream issue #200. + SaveKey(ExtractUserKey(iter_->key()), skip); } - // Temporarily use saved_key_ as storage for key to skip. - std::string* skip = &saved_key_; - SaveKey(ExtractUserKey(iter_->key()), skip); FindNextUserEntry(true, skip); } @@ -288,12 +310,12 @@ void DBIter::SeekToLast() { } // anonymous namespace Iterator* NewDBIterator( - const std::string* dbname, - Env* env, + DBImpl* db, const Comparator* user_key_comparator, Iterator* internal_iter, - const SequenceNumber& sequence) { - return new DBIter(dbname, env, user_key_comparator, internal_iter, sequence); + SequenceNumber sequence, + uint32_t seed) { + return new DBIter(db, user_key_comparator, internal_iter, sequence, seed); } } // namespace hyperleveldb diff --git a/db/db_iter.h b/db/db_iter.h index 5a0a64d321..c1f165c339 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -11,15 +11,17 @@ namespace hyperleveldb { +class DBImpl; + // Return a new iterator that converts internal keys (yielded by // "*internal_iter") that were live at the specified "sequence" number // into appropriate user keys. extern Iterator* NewDBIterator( - const std::string* dbname, - Env* env, + DBImpl* db, const Comparator* user_key_comparator, Iterator* internal_iter, - const SequenceNumber& sequence); + SequenceNumber sequence, + uint32_t seed); } // namespace hyperleveldb diff --git a/db/db_test.cc b/db/db_test.cc index 7e8b4a69c8..ad07288374 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -614,6 +614,47 @@ TEST(DBTest, GetPicksCorrectFile) { } while (ChangeOptions()); } +#if 0 +TEST(DBTest, GetEncountersEmptyLevel) { + do { + // Arrange for the following to happen: + // * sstable A in level 0 + // * nothing in level 1 + // * sstable B in level 2 + // Then do enough Get() calls to arrange for an automatic compaction + // of sstable A. A bug would cause the compaction to be marked as + // occuring at level 1 (instead of the correct level 0). + + // Step 1: First place sstables in levels 0 and 2 + int compaction_count = 0; + while (NumTableFilesAtLevel(0) == 0 || + NumTableFilesAtLevel(2) == 0) { + ASSERT_LE(compaction_count, 100) << "could not fill levels 0 and 2"; + compaction_count++; + Put("a", "begin"); + Put("z", "end"); + dbfull()->TEST_CompactMemTable(); + } + + // Step 2: clear level 1 if necessary. + dbfull()->TEST_CompactRange(1, NULL, NULL); + ASSERT_EQ(NumTableFilesAtLevel(0), 1); + ASSERT_EQ(NumTableFilesAtLevel(1), 0); + ASSERT_EQ(NumTableFilesAtLevel(2), 1); + + // Step 3: read a bunch of times + for (int i = 0; i < 1000; i++) { + ASSERT_EQ("NOT_FOUND", Get("missing")); + } + + // Step 4: Wait for compaction to finish + env_->SleepForMicroseconds(1000000); + + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + } while (ChangeOptions()); +} +#endif + TEST(DBTest, IterEmpty) { Iterator* iter = db_->NewIterator(ReadOptions()); @@ -1793,6 +1834,23 @@ class ModelDB: public DB { return new ModelIter(snapshot_state, false); } } + virtual void GetReplayTimestamp(std::string* timestamp) { + } + virtual void AllowGarbageCollectBeforeTimestamp(const std::string& timestamp) { + } + virtual bool ValidateTimestamp(const std::string&) { + return false; + } + virtual int CompareTimestamps(const std::string&, const std::string&) { + return 0; + } + virtual Status GetReplayIterator(const std::string& timestamp, + ReplayIterator** iter) { + *iter = NULL; + return Status::OK(); + } + virtual void ReleaseReplayIterator(ReplayIterator* iter) { + } virtual const Snapshot* GetSnapshot() { ModelSnapshot* snapshot = new ModelSnapshot; snapshot->map_ = map_; @@ -1828,6 +1886,8 @@ class ModelDB: public DB { } virtual void CompactRange(const Slice* start, const Slice* end) { } + virtual Status LiveBackup(const Slice& name) { + } private: class ModelIter: public Iterator { @@ -1990,6 +2050,71 @@ TEST(DBTest, Randomized) { } while (ChangeOptions()); } +TEST(DBTest, Replay) { + std::string ts; + db_->GetReplayTimestamp(&ts); + ASSERT_OK(Put("key", "v0")); + ASSERT_OK(Put("key", "v1")); + ASSERT_OK(Put("key", "v2")); + ASSERT_OK(Put("key", "v3")); + ASSERT_OK(Put("key", "v4")); + ASSERT_OK(Put("key", "v5")); + ASSERT_OK(Put("key", "v6")); + ASSERT_OK(Put("key", "v7")); + ASSERT_OK(Put("key", "v8")); + ASSERT_OK(Put("key", "v9")); + + // get the iterator + ReplayIterator* iter = NULL; + ASSERT_OK(db_->GetReplayIterator(ts, &iter)); + + // Iterate over what was there to start with + ASSERT_TRUE(iter->Valid()); + ASSERT_TRUE(iter->HasValue()); + ASSERT_EQ("key", iter->key().ToString()); + ASSERT_EQ("v9", iter->value().ToString()); + iter->Next(); + // The implementation is allowed to return things twice. + // This is a case where it will. + ASSERT_TRUE(iter->Valid()); + ASSERT_TRUE(iter->HasValue()); + ASSERT_EQ("key", iter->key().ToString()); + ASSERT_EQ("v9", iter->value().ToString()); + iter->Next(); + // Now it's no longer valid. + ASSERT_TRUE(!iter->Valid()); + ASSERT_TRUE(!iter->Valid()); + + // Add another and iterate some more + ASSERT_OK(Put("key", "v10")); + ASSERT_TRUE(iter->Valid()); + ASSERT_TRUE(iter->HasValue()); + ASSERT_EQ("key", iter->key().ToString()); + ASSERT_EQ("v10", iter->value().ToString()); + iter->Next(); + ASSERT_TRUE(!iter->Valid()); + + // Dump the memtable + dbfull()->TEST_CompactMemTable(); + + // Write into the new MemTable and iterate some more + ASSERT_OK(Put("key", "v11")); + ASSERT_TRUE(iter->Valid()); + ASSERT_TRUE(iter->HasValue()); + ASSERT_EQ("key", iter->key().ToString()); + ASSERT_EQ("v11", iter->value().ToString()); + iter->Next(); + ASSERT_TRUE(!iter->Valid()); + + // What does it do on delete? + ASSERT_OK(Delete("key")); + ASSERT_TRUE(iter->Valid()); + ASSERT_TRUE(!iter->HasValue()); + ASSERT_EQ("key", iter->key().ToString()); + iter->Next(); + ASSERT_TRUE(!iter->Valid()); +} + std::string MakeKey(unsigned int num) { char buf[30]; snprintf(buf, sizeof(buf), "%016u", num); diff --git a/db/dbformat.h b/db/dbformat.h index 1571c5e072..386f38e578 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -40,6 +40,9 @@ static const int kL0_StopWritesTrigger = 12; // space if the same key space is being repeatedly overwritten. static const int kMaxMemCompactLevel = 2; +// Approximate gap in bytes between samples of data read during iteration. +static const int kReadBytesPeriod = 1048576; + } // namespace config class InternalKey; diff --git a/db/memtable.cc b/db/memtable.cc index a20a8e2eaa..73b4cf6c37 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -19,6 +19,10 @@ static Slice GetLengthPrefixedSlice(const char* data) { return Slice(p, len); } +static Slice GetLengthPrefixedSlice(std::pair tk) { + return GetLengthPrefixedSlice(tk.second); +} + MemTable::MemTable(const InternalKeyComparator& cmp) : comparator_(cmp), refs_(0), @@ -34,11 +38,16 @@ size_t MemTable::ApproximateMemoryUsage() { return arena_.MemoryUsage(); } -int MemTable::KeyComparator::operator()(const char* aptr, const char* bptr) +int MemTable::KeyComparator::operator()(TableKey ak, TableKey bk) const { + if (ak.first < bk.first) { + return -1; + } else if (ak.first > bk.first) { + return 1; + } // Internal keys are encoded as length-prefixed strings. - Slice a = GetLengthPrefixedSlice(aptr); - Slice b = GetLengthPrefixedSlice(bptr); + Slice a = GetLengthPrefixedSlice(ak); + Slice b = GetLengthPrefixedSlice(bk); return comparator.Compare(a, b); } @@ -54,10 +63,15 @@ static const char* EncodeKey(std::string* scratch, const Slice& target) { class MemTableIterator: public Iterator { public: - explicit MemTableIterator(MemTable::Table* table) : iter_(table) { } + explicit MemTableIterator(MemTable::Table* table, + MemTable::KeyComparator* cmp) + : iter_(table), comparator_(cmp) { } virtual bool Valid() const { return iter_.Valid(); } - virtual void Seek(const Slice& k) { iter_.Seek(EncodeKey(&tmp_, k)); } + virtual void Seek(const Slice& k) { + uint64_t keynum = comparator_->comparator.user_comparator()->KeyNum(Slice(k.data(), k.size() - 8)); + iter_.Seek(std::make_pair(keynum, EncodeKey(&tmp_, k))); + } virtual void SeekToFirst() { iter_.SeekToFirst(); } virtual void SeekToLast() { iter_.SeekToLast(); } virtual void Next() { iter_.Next(); } @@ -72,6 +86,7 @@ class MemTableIterator: public Iterator { private: MemTable::Table::Iterator iter_; + MemTable::KeyComparator* comparator_; std::string tmp_; // For passing to EncodeKey // No copying allowed @@ -80,7 +95,7 @@ class MemTableIterator: public Iterator { }; Iterator* MemTable::NewIterator() { - return new MemTableIterator(&table_); + return new MemTableIterator(&table_, &comparator_); } void MemTable::Add(SequenceNumber s, ValueType type, @@ -112,18 +127,22 @@ void MemTable::Add(SequenceNumber s, ValueType type, p = EncodeVarint32(p, val_size); memcpy(p, value.data(), val_size); assert((p + val_size) - buf == encoded_len); - Table::InsertHint ih(&table_, buf); + uint64_t keynum = comparator_.comparator.user_comparator()->KeyNum(key); + TableKey tk(keynum, buf); + Table::InsertHint ih(&table_, tk); { MutexLock l(&mtx_); - table_.InsertWithHint(&ih, buf); + table_.InsertWithHint(&ih, tk); } } bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) { Slice memkey = key.memtable_key(); Table::Iterator iter(&table_); - iter.Seek(memkey.data()); + uint64_t keynum = comparator_.comparator.user_comparator()->KeyNum(key.user_key()); + TableKey tk(keynum, memkey.data()); + iter.Seek(tk); if (iter.Valid()) { // entry format is: // klength varint32 @@ -134,10 +153,11 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) { // Check that it belongs to same user key. We do not check the // sequence number since the Seek() call above should have skipped // all entries with overly large sequence numbers. - const char* entry = iter.key(); + const char* entry = iter.key().second; uint32_t key_length; const char* key_ptr = GetVarint32Ptr(entry, entry+5, &key_length); - if (comparator_.comparator.user_comparator()->Compare( + if (iter.key().first == tk.first && + comparator_.comparator.user_comparator()->Compare( Slice(key_ptr, key_length - 8), key.user_key()) == 0) { // Correct user key diff --git a/db/memtable.h b/db/memtable.h index 834b43b6db..c69192f1d1 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -67,16 +67,17 @@ class MemTable { private: ~MemTable(); // Private since only Unref() should be used to delete it + typedef std::pair TableKey; struct KeyComparator { const InternalKeyComparator comparator; explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { } - int operator()(const char* a, const char* b) const; + int operator()(TableKey a, TableKey b) const; }; friend class MemTableIterator; friend class MemTableBackwardIterator; - typedef SkipList Table; + typedef SkipList Table; KeyComparator comparator_; int refs_; diff --git a/db/replay_iterator.cc b/db/replay_iterator.cc new file mode 100644 index 0000000000..bb8341af92 --- /dev/null +++ b/db/replay_iterator.cc @@ -0,0 +1,204 @@ +// Copyright (c) 2013 The HyperLevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "replay_iterator.h" + +#include "filename.h" +#include "db_impl.h" +#include "dbformat.h" +#include "../hyperleveldb/env.h" +#include "../hyperleveldb/iterator.h" +#include "../port/port.h" +#include "../util/logging.h" +#include "../util/mutexlock.h" +#include "../util/random.h" + +namespace hyperleveldb { + +ReplayIterator::ReplayIterator() { +} + +ReplayIterator::~ReplayIterator() { +} + +ReplayState::ReplayState(Iterator* i, SequenceNumber s, SequenceNumber l) + : mem_(NULL), + iter_(i), + seq_start_(s), + seq_limit_(l) { +} + +ReplayState::ReplayState(MemTable* m, SequenceNumber s) + : mem_(m), + iter_(NULL), + seq_start_(s), + seq_limit_(0) { +} + +ReplayIteratorImpl::ReplayIteratorImpl(DBImpl* db, port::Mutex* mutex, const Comparator* cmp, + Iterator* iter, MemTable* m, SequenceNumber s) + : ReplayIterator(), + db_(db), + mutex_(mutex), + user_comparator_(cmp), + start_at_(s), + valid_(), + status_(), + has_current_user_key_(false), + current_user_key_(), + current_user_sequence_(), + rs_(iter, s, kMaxSequenceNumber), + mems_() { + m->Ref(); + mems_.push_back(ReplayState(m, s)); +} + +ReplayIteratorImpl::~ReplayIteratorImpl() { +} + +bool ReplayIteratorImpl::Valid() { + Prime(); + return valid_; +} + +void ReplayIteratorImpl::Next() { + rs_.iter_->Next(); +} + +bool ReplayIteratorImpl::HasValue() { + ParsedInternalKey ikey; + return ParseKey(&ikey) && ikey.type == kTypeValue; +} + +Slice ReplayIteratorImpl::key() const { + assert(valid_); + return ExtractUserKey(rs_.iter_->key()); +} + +Slice ReplayIteratorImpl::value() const { + assert(valid_); + return rs_.iter_->value(); +} + +Status ReplayIteratorImpl::status() const { + if (!status_.ok()) { + return status_; + } else { + return rs_.iter_->status(); + } +} + +void ReplayIteratorImpl::enqueue(MemTable* m, SequenceNumber s) { + m->Ref(); + mems_.push_back(ReplayState(m, s)); +} + +void ReplayIteratorImpl::cleanup() { + mutex_->Unlock(); + if (rs_.iter_) { + delete rs_.iter_; + } + if (rs_.mem_) { + rs_.mem_->Unref(); + } + mutex_->Lock(); + rs_.iter_ = NULL; + rs_.mem_ = NULL; + + while (!mems_.empty()) { + MemTable* mem = mems_.front().mem_; + Iterator* iter = mems_.front().iter_; + mutex_->Unlock(); + if (iter) { + delete iter; + } + if (mem) { + mem->Unref(); + } + mutex_->Lock(); + mems_.pop_front(); + } + + delete this; +} + +bool ReplayIteratorImpl::ParseKey(ParsedInternalKey* ikey) { + return ParseKey(rs_.iter_->key(), ikey); +} + +bool ReplayIteratorImpl::ParseKey(const Slice& k, ParsedInternalKey* ikey) { + if (!ParseInternalKey(k, ikey)) { + status_ = Status::Corruption("corrupted internal key in ReplayIteratorImpl"); + return false; + } else { + return true; + } +} + +void ReplayIteratorImpl::Prime() { + valid_ = false; + if (!status_.ok()) { + return; + } + while (true) { + assert(rs_.iter_); + while (rs_.iter_->Valid()) { + ParsedInternalKey ikey; + if (!ParseKey(rs_.iter_->key(), &ikey)) { + return; + } + // if we can consider this key, and it's recent enough and of the right + // type + if ((!has_current_user_key_ || + user_comparator_->Compare(ikey.user_key, + Slice(current_user_key_)) != 0 || + ikey.sequence >= current_user_sequence_) && + (ikey.sequence >= rs_.seq_start_ && + (ikey.type == kTypeDeletion || ikey.type == kTypeValue))) { + has_current_user_key_ = true; + current_user_key_.assign(ikey.user_key.data(), ikey.user_key.size()); + current_user_sequence_ = ikey.sequence; + valid_ = true; + return; + } + rs_.iter_->Next(); + } + if (!rs_.iter_->status().ok()) { + status_ = rs_.iter_->status(); + valid_ = false; + return; + } + // we're done with rs_.iter_ + has_current_user_key_ = false; + current_user_key_.assign("", 0); + current_user_sequence_ = kMaxSequenceNumber; + delete rs_.iter_; + rs_.iter_ = NULL; + { + MutexLock l(mutex_); + if (mems_.empty() || + rs_.seq_limit_ < mems_.front().seq_start_) { + rs_.seq_start_ = rs_.seq_limit_; + } else { + if (rs_.mem_) { + rs_.mem_->Unref(); + rs_.mem_ = NULL; + } + rs_.mem_ = mems_.front().mem_; + rs_.seq_start_ = mems_.front().seq_start_; + mems_.pop_front(); + } + } + rs_.seq_limit_ = db_->LastSequence(); + rs_.iter_ = rs_.mem_->NewIterator(); + rs_.iter_->SeekToFirst(); + assert(rs_.seq_start_ <= rs_.seq_limit_); + if (rs_.seq_start_ == rs_.seq_limit_) { + valid_ = false; + return; + } + } +} + +} // namespace leveldb diff --git a/db/replay_iterator.h b/db/replay_iterator.h new file mode 100644 index 0000000000..afeb5f68e9 --- /dev/null +++ b/db/replay_iterator.h @@ -0,0 +1,71 @@ +// Copyright (c) 2013 The HyperLevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef STORAGE_HYPERLEVELDB_DB_ROLLING_ITERATOR_H_ +#define STORAGE_HYPERLEVELDB_DB_ROLLING_ITERATOR_H_ + +#include +#include +#include "../hyperleveldb/db.h" +#include "../hyperleveldb/replay_iterator.h" +#include "dbformat.h" +#include "memtable.h" + +namespace hyperleveldb { + +class DBImpl; +struct ReplayState { + ReplayState(Iterator* i, SequenceNumber s, SequenceNumber l); + ReplayState(MemTable* m, SequenceNumber s); + MemTable* mem_; + Iterator* iter_; + SequenceNumber seq_start_; + SequenceNumber seq_limit_; +}; + +class ReplayIteratorImpl : public ReplayIterator { + public: + // Refs the memtable on its own; caller must hold mutex while creating this + ReplayIteratorImpl(DBImpl* db, port::Mutex* mutex, const Comparator* cmp, + Iterator* iter, MemTable* m, SequenceNumber s); + virtual bool Valid(); + virtual void Next(); + virtual bool HasValue(); + virtual Slice key() const; + virtual Slice value() const; + virtual Status status() const; + + // extra interface + + // we ref the memtable; caller holds mutex passed into ctor + // REQUIRES: caller must hold mutex passed into ctor + void enqueue(MemTable* m, SequenceNumber s); + + // REQUIRES: caller must hold mutex passed into ctor + void cleanup(); // calls delete this; + + private: + virtual ~ReplayIteratorImpl(); + bool ParseKey(ParsedInternalKey* ikey); + bool ParseKey(const Slice& k, ParsedInternalKey* ikey); + void Prime(); + + DBImpl* const db_; + port::Mutex* mutex_; + const Comparator* const user_comparator_; + SequenceNumber const start_at_; + bool valid_; + Status status_; + + bool has_current_user_key_; + std::string current_user_key_; + SequenceNumber current_user_sequence_; + + ReplayState rs_; + std::list mems_; +}; + +} // namespace leveldb + +#endif // STORAGE_LEVELDB_DB_ROLLING_ITERATOR_H_ diff --git a/db/skiplist.h b/db/skiplist.h index 12604ae337..8528d3204d 100644 --- a/db/skiplist.h +++ b/db/skiplist.h @@ -351,7 +351,7 @@ template SkipList::SkipList(Comparator cmp, Arena* arena) : compare_(cmp), arena_(arena), - head_(NewNode(0 /* any key will do */, kMaxHeight)), + head_(NewNode(Key(), kMaxHeight)), max_height_(reinterpret_cast(1)), rnd_(0xdeadbeef) { for (int i = 0; i < kMaxHeight; i++) { diff --git a/db/version_set.cc b/db/version_set.cc index 77665e9f12..f6cfd87308 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -176,30 +176,41 @@ bool SomeFileOverlapsRange( // is the largest key that occurs in the file, and value() is an // 16-byte value containing the file number and file size, both // encoded using EncodeFixed64. +// +// If num != 0, then do not call SeekToLast, Prev class Version::LevelFileNumIterator : public Iterator { public: LevelFileNumIterator(const InternalKeyComparator& icmp, - const std::vector* flist) + const std::vector* flist, + uint64_t num) : icmp_(icmp), flist_(flist), - index_(flist->size()) { // Marks as invalid + index_(flist->size()), // Marks as invalid + number_(num) { } virtual bool Valid() const { return index_ < flist_->size(); } virtual void Seek(const Slice& target) { index_ = FindFile(icmp_, *flist_, target); + Bump(); + } + virtual void SeekToFirst() { + index_ = 0; + Bump(); } - virtual void SeekToFirst() { index_ = 0; } virtual void SeekToLast() { + assert(number_ == 0); index_ = flist_->empty() ? 0 : flist_->size() - 1; } virtual void Next() { assert(Valid()); index_++; + Bump(); } virtual void Prev() { assert(Valid()); + assert(number_ == 0); if (index_ == 0) { index_ = flist_->size(); // Marks as invalid } else { @@ -218,9 +229,16 @@ class Version::LevelFileNumIterator : public Iterator { } virtual Status status() const { return Status::OK(); } private: + void Bump() { + while (index_ < flist_->size() && + (*flist_)[index_]->number < number_) { + ++index_; + } + } const InternalKeyComparator icmp_; const std::vector* const flist_; uint32_t index_; + uint64_t number_; // Backing store for value(). Holds the file number and size. mutable char value_buf_[16]; @@ -241,14 +259,19 @@ static Iterator* GetFileIterator(void* arg, } Iterator* Version::NewConcatenatingIterator(const ReadOptions& options, - int level) const { + int level, uint64_t num) const { return NewTwoLevelIterator( - new LevelFileNumIterator(vset_->icmp_, &files_[level]), + new LevelFileNumIterator(vset_->icmp_, &files_[level], num), &GetFileIterator, vset_->table_cache_, options); } void Version::AddIterators(const ReadOptions& options, std::vector* iters) { + return AddSomeIterators(options, 0, iters); +} + +void Version::AddSomeIterators(const ReadOptions& options, uint64_t num, + std::vector* iters) { // Merge all level zero files together since they may overlap for (size_t i = 0; i < files_[0].size(); i++) { iters->push_back( @@ -261,7 +284,7 @@ void Version::AddIterators(const ReadOptions& options, // lazily. for (int level = 1; level < config::kNumLevels; level++) { if (!files_[level].empty()) { - iters->push_back(NewConcatenatingIterator(options, level)); + iters->push_back(NewConcatenatingIterator(options, level, num)); } } } @@ -300,6 +323,51 @@ static bool NewestFirst(FileMetaData* a, FileMetaData* b) { return a->number > b->number; } +void Version::ForEachOverlapping(Slice user_key, Slice internal_key, + void* arg, + bool (*func)(void*, int, FileMetaData*)) { + // TODO(sanjay): Change Version::Get() to use this function. + const Comparator* ucmp = vset_->icmp_.user_comparator(); + + // Search level-0 in order from newest to oldest. + std::vector tmp; + tmp.reserve(files_[0].size()); + for (uint32_t i = 0; i < files_[0].size(); i++) { + FileMetaData* f = files_[0][i]; + if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 && + ucmp->Compare(user_key, f->largest.user_key()) <= 0) { + tmp.push_back(f); + } + } + if (!tmp.empty()) { + std::sort(tmp.begin(), tmp.end(), NewestFirst); + for (uint32_t i = 0; i < tmp.size(); i++) { + if (!(*func)(arg, 0, tmp[i])) { + return; + } + } + } + + // Search other levels. + for (int level = 1; level < config::kNumLevels; level++) { + size_t num_files = files_[level].size(); + if (num_files == 0) continue; + + // Binary search to find earliest index whose largest key >= internal_key. + uint32_t index = FindFile(vset_->icmp_, files_[level], internal_key); + if (index < num_files) { + FileMetaData* f = files_[level][index]; + if (ucmp->Compare(user_key, f->smallest.user_key()) < 0) { + // All of "f" is past any data for user_key + } else { + if (!(*func)(arg, level, f)) { + return; + } + } + } + } +} + Status Version::Get(const ReadOptions& options, const LookupKey& k, std::string* value, @@ -399,6 +467,57 @@ Status Version::Get(const ReadOptions& options, return Status::NotFound(Slice()); // Use an empty error message for speed } +bool Version::UpdateStats(const GetStats& stats) { + FileMetaData* f = stats.seek_file; + if (f != NULL) { + f->allowed_seeks--; + if (f->allowed_seeks <= 0 && file_to_compact_ == NULL) { + file_to_compact_ = f; + file_to_compact_level_ = stats.seek_file_level; + return true; + } + } + return false; +} + +bool Version::RecordReadSample(Slice internal_key) { + ParsedInternalKey ikey; + if (!ParseInternalKey(internal_key, &ikey)) { + return false; + } + + struct State { + GetStats stats; // Holds first matching file + int matches; + + static bool Match(void* arg, int level, FileMetaData* f) { + State* state = reinterpret_cast(arg); + state->matches++; + if (state->matches == 1) { + // Remember first match. + state->stats.seek_file = f; + state->stats.seek_file_level = level; + } + // We can stop iterating once we have a second match. + return state->matches < 2; + } + }; + + State state; + state.matches = 0; + ForEachOverlapping(ikey.user_key, internal_key, &state, &State::Match); + + // Must have at least two matches since we want to merge across + // files. But what if we have a single file that contains many + // overwrites and deletions? Should we have another mechanism for + // finding such files? + if (state.matches >= 2) { + // 1MB cost is about 1 seek (see comment in Builder::Apply). + return UpdateStats(state.stats); + } + return false; +} + void Version::Ref() { ++refs_; } @@ -447,6 +566,8 @@ void Version::GetOverlappingInputs( const InternalKey* begin, const InternalKey* end, std::vector* inputs) { + assert(level >= 0); + assert(level < config::kNumLevels); inputs->clear(); Slice user_begin, user_end; if (begin != NULL) { @@ -1180,7 +1301,7 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) { } else { // Create concatenating iterator for the files from this level list[num++] = NewTwoLevelIterator( - new Version::LevelFileNumIterator(icmp_, &c->inputs_[which]), + new Version::LevelFileNumIterator(icmp_, &c->inputs_[which], 0), &GetFileIterator, table_cache_, options); } } @@ -1199,20 +1320,21 @@ struct CompactionBoundary { }; struct CmpByRange { - CmpByRange(const Comparator* cmp) : cmp_(cmp) {} + CmpByRange(const InternalKeyComparator* cmp) : cmp_(cmp) {} bool operator () (const FileMetaData* lhs, const FileMetaData* rhs) { - int smallest = cmp_->Compare(lhs->smallest.user_key(), rhs->smallest.user_key()); + int smallest = cmp_->Compare(lhs->smallest, rhs->smallest); if (smallest == 0) { - return cmp_->Compare(lhs->largest.user_key(), rhs->largest.user_key()) < 0; + return cmp_->Compare(lhs->largest, rhs->largest) < 0; } return smallest < 0; } private: - const Comparator* cmp_; + const InternalKeyComparator* cmp_; }; // Stores the compaction boundaries between level and level + 1 -void VersionSet::GetCompactionBoundaries(int level, +void VersionSet::GetCompactionBoundaries(Version* v, + int level, std::vector* LA, std::vector* LB, std::vector* LA_sizes, @@ -1220,12 +1342,12 @@ void VersionSet::GetCompactionBoundaries(int level, std::vector* boundaries) { const Comparator* user_cmp = icmp_.user_comparator(); - *LA = current_->files_[level + 0]; - *LB = current_->files_[level + 1]; + *LA = v->files_[level + 0]; + *LB = v->files_[level + 1]; *LA_sizes = std::vector(LA->size() + 1, 0); *LB_sizes = std::vector(LB->size() + 1, 0); - std::sort(LA->begin(), LA->end(), CmpByRange(user_cmp)); - std::sort(LB->begin(), LB->end(), CmpByRange(user_cmp)); + std::sort(LA->begin(), LA->end(), CmpByRange(&icmp_)); + std::sort(LB->begin(), LB->end(), CmpByRange(&icmp_)); boundaries->resize(LA->size()); // compute sizes @@ -1259,7 +1381,7 @@ void VersionSet::GetCompactionBoundaries(int level, } } -int VersionSet::PickCompactionLevel(bool* locked) { +int VersionSet::PickCompactionLevel(bool* locked, bool seek_driven) const { // Find an unlocked level has score >= 1 where level + 1 has score < 1. int level = config::kNumLevels; for (int i = 0; i + 1 < config::kNumLevels; ++i) { @@ -1267,11 +1389,19 @@ int VersionSet::PickCompactionLevel(bool* locked) { continue; } if (current_->compaction_scores_[i + 0] >= 1.0 && - current_->compaction_scores_[i + 1] < 1.0) { + (i + 2 >= config::kNumLevels || + current_->compaction_scores_[i + 1] < 1.0)) { level = i; break; } } + if (seek_driven && + level == config::kNumLevels && + current_->file_to_compact_ != NULL && + !locked[current_->file_to_compact_level_ + 0] && + !locked[current_->file_to_compact_level_ + 1]) { + level = current_->file_to_compact_level_; + } return level; } @@ -1279,16 +1409,16 @@ static bool OldestFirst(FileMetaData* a, FileMetaData* b) { return a->number < b->number; } -Compaction* VersionSet::PickCompaction(int level) { +Compaction* VersionSet::PickCompaction(Version* v, int level) { assert(0 <= level && level < config::kNumLevels); bool trivial = false; - if (current_->files_[level].empty()) { + if (v->files_[level].empty()) { return NULL; } Compaction* c = new Compaction(level); - c->input_version_ = current_; + c->input_version_ = v; c->input_version_->Ref(); if (level > 0) { @@ -1297,7 +1427,7 @@ Compaction* VersionSet::PickCompaction(int level) { std::vector LA_sizes; std::vector LB_sizes; std::vector boundaries; - GetCompactionBoundaries(level, &LA, &LB, &LA_sizes, &LB_sizes, &boundaries); + GetCompactionBoundaries(v, level, &LA, &LB, &LA_sizes, &LB_sizes, &boundaries); // find the best set of files: maximize the ratio of sizeof(LA)/sizeof(LB) // while keeping sizeof(LA)+sizeof(LB) < some threshold. If there's a tie @@ -1358,6 +1488,9 @@ Compaction* VersionSet::PickCompaction(int level) { c->inputs_[1].push_back(LB[i]); } c->SetRatio(best_ratio); + // pick the file to compact in this level + } else if (v->file_to_compact_ != NULL) { + c->inputs_[0].push_back(v->file_to_compact_); // otherwise just pick the file with least overlap } else { assert(level >= 0); @@ -1379,15 +1512,13 @@ Compaction* VersionSet::PickCompaction(int level) { } } } else { - std::vector tmp(current_->files_[0]); + std::vector tmp(v->files_[0]); std::sort(tmp.begin(), tmp.end(), OldestFirst); for (size_t i = 0; i < tmp.size() && c->inputs_[0].size() < 32; ++i) { c->inputs_[0].push_back(tmp[i]); } } - assert(!c->inputs_[0].empty()); - if (!trivial) { SetupOtherInputs(c); } @@ -1398,13 +1529,13 @@ void VersionSet::SetupOtherInputs(Compaction* c) { const int level = c->level(); InternalKey smallest, largest; GetRange(c->inputs_[0], &smallest, &largest); - current_->GetOverlappingInputs(level+1, &smallest, &largest, &c->inputs_[1]); + c->input_version_->GetOverlappingInputs(level+1, &smallest, &largest, &c->inputs_[1]); // Update the place where we will do the next compaction for this level. // We update this immediately instead of waiting for the VersionEdit // to be applied so that if the compaction fails, we will try a different // key range next time. - compact_pointer_[level] = largest.Encode().ToString(); + //compact_pointer_[level] = largest.Encode().ToString(); c->edit_.SetCompactPointer(level, largest); } diff --git a/db/version_set.h b/db/version_set.h index 8c0a0655b8..c71ea15c15 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -64,6 +64,12 @@ class Version { // REQUIRES: This version has been saved (see VersionSet::SaveTo) void AddIterators(const ReadOptions&, std::vector* iters); + // Append to *iters a sequence of iterators that will + // yield a subset of the contents of this Version when merged together. + // Yields only files with number greater or equal to num + // REQUIRES: This version has been saved (see VersionSet::SaveTo) + void AddSomeIterators(const ReadOptions&, uint64_t num, std::vector* iters); + // Lookup the value for key. If found, store it in *val and // return OK. Else return a non-OK status. Fills *stats. // REQUIRES: lock is not held @@ -74,6 +80,17 @@ class Version { Status Get(const ReadOptions&, const LookupKey& key, std::string* val, GetStats* stats); + // Adds "stats" into the current state. Returns true if a new + // compaction may need to be triggered, false otherwise. + // REQUIRES: lock is held + bool UpdateStats(const GetStats& stats); + + // Record a sample of bytes read at the specified internal key. + // Samples are taken approximately once every config::kReadBytesPeriod + // bytes. Returns true if a new compaction may need to be triggered. + // REQUIRES: lock is held + bool RecordReadSample(Slice key); + // Reference count management (so Versions do not disappear out from // under live iterators) void Ref(); @@ -108,7 +125,16 @@ class Version { friend class VersionSet; class LevelFileNumIterator; - Iterator* NewConcatenatingIterator(const ReadOptions&, int level) const; + Iterator* NewConcatenatingIterator(const ReadOptions&, int level, uint64_t num) const; + + // Call func(arg, level, f) for every file that overlaps user_key in + // order from newest to oldest. If an invocation of func returns + // false, makes no more calls. + // + // REQUIRES: user portion of internal_key == user_key. + void ForEachOverlapping(Slice user_key, Slice internal_key, + void* arg, + bool (*func)(void*, int, FileMetaData*)); VersionSet* vset_; // VersionSet to which this Version belongs Version* next_; // Next version in linked list @@ -118,13 +144,19 @@ class Version { // List of files per level std::vector files_[config::kNumLevels]; + // Next file to compact based on seek stats. + FileMetaData* file_to_compact_; + int file_to_compact_level_; + // Level that should be compacted next and its compaction score. // Score < 1 means compaction is not strictly needed. These fields // are initialized by Finalize(). double compaction_scores_[config::kNumLevels]; explicit Version(VersionSet* vset) - : vset_(vset), next_(this), prev_(this), refs_(0) { + : vset_(vset), next_(this), prev_(this), refs_(0), + file_to_compact_(NULL), + file_to_compact_level_(-1) { for (int i = 0; i < config::kNumLevels; ++i) { compaction_scores_[i] = -1; } @@ -202,13 +234,13 @@ class VersionSet { // Pick level for a new compaction. // Returns kNumLevels if there is no compaction to be done. // Otherwise returns the lowest unlocked level that may compact upwards. - int PickCompactionLevel(bool* locked); + int PickCompactionLevel(bool* locked, bool seek_driven) const; // Pick inputs for a new compaction at the specified level. // Returns NULL if there is no compaction to be done. // Otherwise returns a pointer to a heap-allocated object that // describes the compaction. Caller should delete the result. - Compaction* PickCompaction(int level); + Compaction* PickCompaction(Version* v, int level); // Return a compaction object for compacting the range [begin,end] in // the specified level. Returns NULL if there is nothing in that @@ -228,17 +260,8 @@ class VersionSet { Iterator* MakeInputIterator(Compaction* c); // Returns true iff some level needs a compaction. - bool NeedsCompaction(bool* levels) const { - Version* v = current_; - for (int i = 0; i + 1 < config::kNumLevels; ++i) { - if (!levels[i] && !levels[i + 1] && - v->compaction_scores_[i] >= 1.0 && - (i + 2 == config::kNumLevels || - v->compaction_scores_[i + 1] < 1.0)) { - return true; - } - } - return false; + bool NeedsCompaction(bool* levels, bool seek_driven) const { + return PickCompactionLevel(levels, seek_driven) != config::kNumLevels; } // Add all files listed in any live version to *live. @@ -273,7 +296,8 @@ class VersionSet { InternalKey* smallest, InternalKey* largest); - void GetCompactionBoundaries(int level, + void GetCompactionBoundaries(Version* version, + int level, std::vector* LA, std::vector* LB, std::vector* LA_sizes, diff --git a/hyperleveldb.upack.in b/hyperleveldb.upack.in new file mode 100644 index 0000000000..3920a76e08 --- /dev/null +++ b/hyperleveldb.upack.in @@ -0,0 +1,41 @@ +package hyperleveldb +| source="hyperleveldb" +| debian name="libhyperleveldb0" +| version="@VERSION@" +| release="1" +| license="BSD" +| copyright="2011-2013 The LevelDB Authors" +| homepage="http://hyperdex.org" +| tarball="http://hyperdex.org/src/hyperleveldb-{version}.tar.gz" +| debian section="libs" +| configure="--disable-static" +| summary="A fast key-value storage library" ++ {libdir}/libhyperleveldb.so.0 ++ {libdir}/libhyperleveldb.so.0.0.0 +'''LevelDB is a fast key-value storage library written at Google that provides +an ordered mapping from string keys to string values.''' + +subpackage hyperleveldb-devel +| debian name="libhyperleveldb-dev" +| debian section="libdevel" +| debian requires="libhyperleveldb0{self}" +| fedora requires="hyperleveldb{self}" +| summary="A fast key-value storage library (development files)" ++ {includedir}/hyperleveldb/cache.h ++ {includedir}/hyperleveldb/c.h ++ {includedir}/hyperleveldb/comparator.h ++ {includedir}/hyperleveldb/db.h ++ {includedir}/hyperleveldb/env.h ++ {includedir}/hyperleveldb/filter_policy.h ++ {includedir}/hyperleveldb/iterator.h ++ {includedir}/hyperleveldb/options.h ++ {includedir}/hyperleveldb/replay_iterator.h ++ {includedir}/hyperleveldb/slice.h ++ {includedir}/hyperleveldb/status.h ++ {includedir}/hyperleveldb/table_builder.h ++ {includedir}/hyperleveldb/table.h ++ {includedir}/hyperleveldb/write_batch.h ++ {libdir}/libhyperleveldb.so ++ {libdir}/pkgconfig/libhyperleveldb.pc + +exclude {libdir}/libhyperleveldb.la diff --git a/hyperleveldb/comparator.h b/hyperleveldb/comparator.h index 5c056aee12..4f07f6b8e6 100644 --- a/hyperleveldb/comparator.h +++ b/hyperleveldb/comparator.h @@ -5,6 +5,7 @@ #ifndef STORAGE_HYPERLEVELDB_INCLUDE_COMPARATOR_H_ #define STORAGE_HYPERLEVELDB_INCLUDE_COMPARATOR_H_ +#include #include namespace hyperleveldb { @@ -51,6 +52,9 @@ class Comparator { // Simple comparator implementations may return with *key unchanged, // i.e., an implementation of this method that does nothing is correct. virtual void FindShortSuccessor(std::string* key) const = 0; + + // If unsure, return 0; + virtual uint64_t KeyNum(const Slice& key) const; }; // Return a builtin comparator that uses lexicographic byte-wise diff --git a/hyperleveldb/db.h b/hyperleveldb/db.h index 5f573ae5ef..14e2bee2d0 100644 --- a/hyperleveldb/db.h +++ b/hyperleveldb/db.h @@ -7,14 +7,16 @@ #include #include + #include "iterator.h" #include "options.h" +#include "replay_iterator.h" namespace hyperleveldb { // Update Makefile if you change these static const int kMajorVersion = 1; -static const int kMinorVersion = 11; +static const int kMinorVersion = 13; struct Options; struct ReadOptions; @@ -140,6 +142,36 @@ class DB { // db->CompactRange(NULL, NULL); virtual void CompactRange(const Slice* begin, const Slice* end) = 0; + // Create a live backup of a live LevelDB instance. + // The backup is stored in a directory named "backup-" under the top + // level of the open LevelDB database. The implementation is permitted, and + // even encouraged, to improve the performance of this call through + // hard-links. + virtual Status LiveBackup(const Slice& name) = 0; + + // Return an opaque timestamp that identifies the current point in time of the + // database. This timestamp may be subsequently presented to the + // NewReplayIterator method to create a ReplayIterator. + virtual void GetReplayTimestamp(std::string* timestamp) = 0; + + // Set the lower bound for manual garbage collection. This method only takes + // effect when Options.manual_garbage_collection is true. + virtual void AllowGarbageCollectBeforeTimestamp(const std::string& timestamp) = 0; + + // Validate the timestamp + virtual bool ValidateTimestamp(const std::string& timestamp) = 0; + + // Compare two timestamps and return -1, 0, 1 for lt, eq, gt + virtual int CompareTimestamps(const std::string& lhs, const std::string& rhs) = 0; + + // Return a ReplayIterator that returns every write operation performed after + // the timestamp. + virtual Status GetReplayIterator(const std::string& timestamp, + ReplayIterator** iter) = 0; + + // Release a previously allocated replay iterator. + virtual void ReleaseReplayIterator(ReplayIterator* iter) = 0; + private: // No copying allowed DB(const DB&); diff --git a/hyperleveldb/env.h b/hyperleveldb/env.h index 7ff2aacb75..79199c57e9 100644 --- a/hyperleveldb/env.h +++ b/hyperleveldb/env.h @@ -94,6 +94,15 @@ class Env { virtual Status RenameFile(const std::string& src, const std::string& target) = 0; + // Copy file src to target. + virtual Status CopyFile(const std::string& src, + const std::string& target) = 0; + + // Link file src to target. + virtual Status LinkFile(const std::string& src, + const std::string& target) = 0; + + // Lock the specified file. Used to prevent concurrent access to // the same db by multiple processes. On failure, stores NULL in // *lock and returns non-OK. @@ -306,6 +315,12 @@ class EnvWrapper : public Env { Status RenameFile(const std::string& s, const std::string& t) { return target_->RenameFile(s, t); } + Status CopyFile(const std::string& s, const std::string& t) { + return target_->CopyFile(s, t); + } + Status LinkFile(const std::string& s, const std::string& t) { + return target_->LinkFile(s, t); + } Status LockFile(const std::string& f, FileLock** l) { return target_->LockFile(f, l); } diff --git a/hyperleveldb/options.h b/hyperleveldb/options.h index 29c2b0209c..0136bd99de 100644 --- a/hyperleveldb/options.h +++ b/hyperleveldb/options.h @@ -135,6 +135,17 @@ struct Options { // Default: NULL const FilterPolicy* filter_policy; + // Is the database used with the Replay mechanism? If yes, the lower bound on + // values to compact is (somewhat) left up to the application; if no, then + // LevelDB functions as usual, and uses snapshots to determine the lower + // bound. HyperLevelDB will always maintain the integrity of snapshots, so + // the application merely has the option to hold data as if it's holding a + // snapshot. This just prevents compaction from grabbing data before the app + // can get a snapshot. + // + // Default: false/no. + bool manual_garbage_collection; + // Create an Options object with default values for all fields. Options(); }; diff --git a/hyperleveldb/replay_iterator.h b/hyperleveldb/replay_iterator.h new file mode 100644 index 0000000000..c708125275 --- /dev/null +++ b/hyperleveldb/replay_iterator.h @@ -0,0 +1,57 @@ +// Copyright (c) 2013 The HyperLevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef STORAGE_HYPERLEVELDB_INCLUDE_REPLAY_ITERATOR_H_ +#define STORAGE_HYPERLEVELDB_INCLUDE_REPLAY_ITERATOR_H_ + +#include "slice.h" +#include "status.h" + +namespace hyperleveldb { + +class ReplayIterator { + public: + ReplayIterator(); + + // An iterator is either positioned at a deleted key, present key/value pair, + // or not valid. This method returns true iff the iterator is valid. + virtual bool Valid() = 0; + + // Moves to the next entry in the source. After this call, Valid() is + // true iff the iterator was not positioned at the last entry in the source. + // REQUIRES: Valid() + virtual void Next() = 0; + + // Return true if the current entry points to a key-value pair. If this + // returns false, it means the current entry is a deleted entry. + virtual bool HasValue() = 0; + + // Return the key for the current entry. The underlying storage for + // the returned slice is valid only until the next modification of + // the iterator. + // REQUIRES: Valid() + virtual Slice key() const = 0; + + // Return the value for the current entry. The underlying storage for + // the returned slice is valid only until the next modification of + // the iterator. + // REQUIRES: !AtEnd() && !AtStart() + virtual Slice value() const = 0; + + // If an error has occurred, return it. Else return an ok status. + virtual Status status() const = 0; + + protected: + // must be released by giving it back to the DB + virtual ~ReplayIterator(); + + private: + // No copying allowed + ReplayIterator(const ReplayIterator&); + void operator=(const ReplayIterator&); +}; + +} // namespace leveldb + +#endif // STORAGE_LEVELDB_INCLUDE_REPLAY_ITERATOR_H_ diff --git a/issues/issue178_test.cc b/issues/issue178_test.cc index 05dc01af47..ffba66a7fd 100644 --- a/issues/issue178_test.cc +++ b/issues/issue178_test.cc @@ -1,3 +1,7 @@ +// Copyright (c) 2013 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + // Test for issue 178: a manual compaction causes deleted data to reappear. #include #include diff --git a/issues/issue200_test.cc b/issues/issue200_test.cc new file mode 100644 index 0000000000..49aae91a31 --- /dev/null +++ b/issues/issue200_test.cc @@ -0,0 +1,70 @@ +// Test for issue 200: iterator reversal brings in keys newer than snapshots +#include +#include +#include + +#include "hyperleveldb/db.h" +#include "hyperleveldb/write_batch.h" +#include "util/testharness.h" + +namespace { + +class Issue200 { }; + +TEST(Issue200, Test) { + // Get rid of any state from an old run. + std::string dbpath = leveldb::test::TmpDir() + "/leveldb_200_iterator_test"; + DestroyDB(dbpath, leveldb::Options()); + + // Open database. + leveldb::DB* db; + leveldb::Options db_options; + db_options.create_if_missing = true; + db_options.compression = leveldb::kNoCompression; + ASSERT_OK(leveldb::DB::Open(db_options, dbpath, &db)); + + leveldb::WriteOptions write_options; + db->Put(write_options, leveldb::Slice("1", 1), leveldb::Slice("b", 1)); + db->Put(write_options, leveldb::Slice("2", 1), leveldb::Slice("c", 1)); + db->Put(write_options, leveldb::Slice("3", 1), leveldb::Slice("d", 1)); + db->Put(write_options, leveldb::Slice("4", 1), leveldb::Slice("e", 1)); + db->Put(write_options, leveldb::Slice("5", 1), leveldb::Slice("f", 1)); + + const leveldb::Snapshot *snapshot = db->GetSnapshot(); + + leveldb::ReadOptions read_options; + read_options.snapshot = snapshot; + leveldb::Iterator *iter = db->NewIterator(read_options); + + // Commenting out this Put changes the iterator behavior, + // gives the expected behavior. This is unexpected because + // the iterator is taken on a snapshot that was taken + // before the Put + db->Put(write_options, leveldb::Slice("25", 2), leveldb::Slice("cd", 2)); + + iter->Seek(leveldb::Slice("5", 1)); + ASSERT_EQ("5", iter->key().ToString()); + iter->Prev(); + ASSERT_EQ("4", iter->key().ToString()); + iter->Prev(); + ASSERT_EQ("3", iter->key().ToString()); + + // At this point the iterator is at "3", I expect the Next() call will + // move it to "4". But it stays on "3" + iter->Next(); + ASSERT_EQ("4", iter->key().ToString()); + iter->Next(); + ASSERT_EQ("5", iter->key().ToString()); + + // close database + delete iter; + db->ReleaseSnapshot(snapshot); + delete db; + DestroyDB(dbpath, leveldb::Options()); +} + +} // anonymous namespace + +int main(int argc, char** argv) { + return leveldb::test::RunAllTests(); +} diff --git a/util/comparator.cc b/util/comparator.cc index c6f6072a6d..ee14872238 100644 --- a/util/comparator.cc +++ b/util/comparator.cc @@ -4,15 +4,21 @@ #include #include + #include "../hyperleveldb/comparator.h" #include "../hyperleveldb/slice.h" #include "../port/port.h" +#include "coding.h" #include "logging.h" namespace hyperleveldb { Comparator::~Comparator() { } +uint64_t Comparator::KeyNum(const Slice& key) const { + return 0; +} + namespace { class BytewiseComparatorImpl : public Comparator { public: @@ -63,6 +69,22 @@ class BytewiseComparatorImpl : public Comparator { } // *key is a run of 0xffs. Leave it alone. } + + virtual uint64_t KeyNum(const Slice& key) const { + unsigned char buf[sizeof(uint64_t)]; + memset(buf, 0, sizeof(buf)); + memmove(buf, key.data(), std::min(key.size(), sizeof(uint64_t))); + uint64_t number; + number = static_cast(buf[0]) << 56 + | static_cast(buf[1]) << 48 + | static_cast(buf[2]) << 40 + | static_cast(buf[3]) << 32 + | static_cast(buf[4]) << 24 + | static_cast(buf[5]) << 16 + | static_cast(buf[6]) << 8 + | static_cast(buf[7]); + return number; + } }; } // namespace diff --git a/util/env_posix.cc b/util/env_posix.cc index c0346325dd..da70f6fa5b 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -324,10 +324,42 @@ class PosixMmapFile : public WritableFile { return s; } - virtual Status Sync() { + Status SyncDirIfManifest() { + const char* f = filename_.c_str(); + const char* sep = strrchr(f, '/'); + Slice basename; + std::string dir; + if (sep == NULL) { + dir = "."; + basename = f; + } else { + dir = std::string(f, sep - f); + basename = sep + 1; + } Status s; + if (basename.starts_with("MANIFEST")) { + int fd = open(dir.c_str(), O_RDONLY); + if (fd < 0) { + s = IOError(dir, errno); + } else { + if (fsync(fd) < 0) { + s = IOError(dir, errno); + } + close(fd); + } + } + return s; + } + + virtual Status Sync() { + // Ensure new files referred to by the manifest are in the filesystem. + Status s = SyncDirIfManifest(); bool need_sync = false; + if (!s.ok()) { + return s; + } + { MutexLock l(&mtx_); need_sync = sync_offset_ != end_offset_; @@ -504,6 +536,54 @@ class PosixEnv : public Env { return result; } + virtual Status CopyFile(const std::string& src, const std::string& target) { + Status result; + int fd1; + int fd2; + + if (result.ok() && (fd1 = open(src.c_str(), O_RDONLY)) < 0) { + result = IOError(src, errno); + } + if (result.ok() && (fd2 = open(target.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644)) < 0) { + result = IOError(target, errno); + } + + ssize_t amt = 0; + char buf[512]; + + while (result.ok() && (amt = read(fd1, buf, 512)) > 0) { + if (write(fd2, buf, amt) != amt) { + result = IOError(src, errno); + } + } + + if (result.ok() && amt < 0) { + result = IOError(src, errno); + } + + if (fd1 >= 0 && close(fd1) < 0) { + if (result.ok()) { + result = IOError(src, errno); + } + } + + if (fd2 >= 0 && close(fd2) < 0) { + if (result.ok()) { + result = IOError(target, errno); + } + } + + return result; + } + + virtual Status LinkFile(const std::string& src, const std::string& target) { + Status result; + if (link(src.c_str(), target.c_str()) != 0) { + result = IOError(src, errno); + } + return result; + } + virtual Status LockFile(const std::string& fname, FileLock** lock) { *lock = NULL; Status result; diff --git a/util/options.cc b/util/options.cc index 0825f54067..ee96a406d0 100644 --- a/util/options.cc +++ b/util/options.cc @@ -22,7 +22,8 @@ Options::Options() block_size(4096), block_restart_interval(16), compression(kSnappyCompression), - filter_policy(NULL) { + filter_policy(NULL), + manual_garbage_collection(false) { } diff --git a/util/random.h b/util/random.h index 119b1aa1c1..23fe6f5d8e 100644 --- a/util/random.h +++ b/util/random.h @@ -16,7 +16,12 @@ class Random { private: uint32_t seed_; public: - explicit Random(uint32_t s) : seed_(s & 0x7fffffffu) { } + explicit Random(uint32_t s) : seed_(s & 0x7fffffffu) { + // Avoid bad seeds. + if (seed_ == 0 || seed_ == 2147483647L) { + seed_ = 1; + } + } uint32_t Next() { static const uint32_t M = 2147483647L; // 2^31-1 static const uint64_t A = 16807; // bits 14, 8, 7, 5, 2, 1, 0