Merge commit '596a35accad6e838e05180e79c1ea626eaf93a34' into develop

This commit is contained in:
Vinnie Falco
2013-11-19 11:32:55 -08:00
33 changed files with 1747 additions and 228 deletions

View File

@@ -7,26 +7,27 @@
*.o
*_test
# specific files
aclocal.m4
autom4te.cache/
benchmark
config.guess
config.h
config.h.in
config.log
config.status
config.sub
configure
db_bench
depcomp
install-sh
leveldbutil
libhyperleveldb.pc
libtool
ltmain.sh
m4/
Makefile
Makefile.in
Makefile.old
missing
stamp-h1
/aclocal.m4
/autom4te.cache/
/benchmark
/config.guess
/config.h
/config.h.in
/config.log
/config.status
/config.sub
/configure
/db_bench
/depcomp
/hyperleveldb.upack
/install-sh
/leveldbutil
/libhyperleveldb.pc
/libtool
/ltmain.sh
/m4/
/Makefile
/Makefile.in
/Makefile.old
/missing
/stamp-h1

View File

@@ -7,5 +7,8 @@ Google Inc.
Jeffrey Dean <jeff@google.com>
Sanjay Ghemawat <sanjay@google.com>
# Partial list of contributors:
Kevin Regan <kevin.d.regan@gmail.com>
# HyperLevelDB authors:
Robert Escriva <robert@hyperdex.org>

View File

@@ -26,8 +26,8 @@
## POSSIBILITY OF SUCH DAMAGE.
ACLOCAL_AMFLAGS = -I m4 ${ACLOCAL_FLAGS}
AM_CFLAGS = -DLEVELDB_PLATFORM_POSIX -fno-builtin-memcmp -fno-builtin-memmove
AM_CXXFLAGS = -DLEVELDB_PLATFORM_POSIX -fno-builtin-memcmp -fno-builtin-memmove
AM_CFLAGS = -DLEVELDB_PLATFORM_POSIX $(SNAPPY_FLAGS) -fno-builtin-memcmp -fno-builtin-memmove
AM_CXXFLAGS = -DLEVELDB_PLATFORM_POSIX $(SNAPPY_FLAGS) -fno-builtin-memcmp -fno-builtin-memmove
AM_MAKEFLAGS = --no-print-directory
pkgconfigdir = $(libdir)/pkgconfig
@@ -35,11 +35,19 @@ pkgconfig_DATA = libhyperleveldb.pc
EXTRA_DIST =
EXTRA_DIST += AUTHORS
EXTRA_DIST += doc/benchmark.html
EXTRA_DIST += doc/doc.css
EXTRA_DIST += doc/impl.html
EXTRA_DIST += doc/index.html
EXTRA_DIST += doc/log_format.txt
EXTRA_DIST += doc/table_format.txt
EXTRA_DIST += helpers/memenv/memenv.cc
EXTRA_DIST += helpers/memenv/memenv_test.cc
EXTRA_DIST += LICENSE
EXTRA_DIST += NEWS
EXTRA_DIST += port/README
EXTRA_DIST += README
EXTRA_DIST += TODO
EXTRA_DIST += port/README
nobase_include_HEADERS =
nobase_include_HEADERS += hyperleveldb/cache.h
@@ -51,6 +59,7 @@ nobase_include_HEADERS += hyperleveldb/filter_policy.h
nobase_include_HEADERS += hyperleveldb/iterator.h
nobase_include_HEADERS += hyperleveldb/options.h
nobase_include_HEADERS += hyperleveldb/slice.h
nobase_include_HEADERS += hyperleveldb/replay_iterator.h
nobase_include_HEADERS += hyperleveldb/status.h
nobase_include_HEADERS += hyperleveldb/table_builder.h
nobase_include_HEADERS += hyperleveldb/table.h
@@ -66,6 +75,7 @@ noinst_HEADERS += db/log_reader.h
noinst_HEADERS += db/log_writer.h
noinst_HEADERS += db/memtable.h
noinst_HEADERS += db/skiplist.h
noinst_HEADERS += db/replay_iterator.h
noinst_HEADERS += db/snapshot.h
noinst_HEADERS += db/table_cache.h
noinst_HEADERS += db/version_edit.h
@@ -110,6 +120,7 @@ libhyperleveldb_la_SOURCES += db/log_reader.cc
libhyperleveldb_la_SOURCES += db/log_writer.cc
libhyperleveldb_la_SOURCES += db/memtable.cc
libhyperleveldb_la_SOURCES += db/repair.cc
libhyperleveldb_la_SOURCES += db/replay_iterator.cc
libhyperleveldb_la_SOURCES += db/table_cache.cc
libhyperleveldb_la_SOURCES += db/version_edit.cc
libhyperleveldb_la_SOURCES += db/version_set.cc
@@ -138,6 +149,7 @@ libhyperleveldb_la_SOURCES += util/logging.cc
libhyperleveldb_la_SOURCES += util/options.cc
libhyperleveldb_la_SOURCES += util/status.cc
libhyperleveldb_la_SOURCES += port/port_posix.cc
libhyperleveldb_la_LIBADD = $(SNAPPY_LIBS) -lpthread
libhyperleveldb_la_LDFLAGS = -pthread
TESTUTIL = util/testutil.cc
@@ -153,6 +165,7 @@ EXTRA_PROGRAMS += db_bench_sqlite3
EXTRA_PROGRAMS += db_bench_tree_db
check_PROGRAMS =
check_PROGRAMS += autocompact_test
check_PROGRAMS += arena_test
check_PROGRAMS += bloom_test
check_PROGRAMS += c_test
@@ -172,6 +185,7 @@ check_PROGRAMS += version_edit_test
check_PROGRAMS += version_set_test
check_PROGRAMS += write_batch_test
check_PROGRAMS += issue178_test
check_PROGRAMS += issue200_test
TESTS = $(check_PROGRAMS)
@@ -191,6 +205,9 @@ db_bench_tree_db_LDADD = -lkyotocabinet
leveldbutil_SOURCES = db/leveldb_main.cc
leveldbutil_LDADD = libhyperleveldb.la -lpthread
autocompact_test_SOURCES = db/autocompact_test.cc $(TESTHARNESS)
autocompact_test_LDADD = libhyperleveldb.la -lpthread
arena_test_SOURCES = util/arena_test.cc $(TESTHARNESS)
arena_test_LDADD = libhyperleveldb.la -lpthread
@@ -247,3 +264,6 @@ write_batch_test_LDADD = libhyperleveldb.la -lpthread
issue178_test_SOURCES = issues/issue178_test.cc $(TESTHARNESS)
issue178_test_LDADD = libhyperleveldb.la -lpthread
issue200_test_SOURCES = issues/issue200_test.cc $(TESTHARNESS)
issue200_test_LDADD = libhyperleveldb.la -lpthread

View File

@@ -49,3 +49,14 @@ include/env.h
include/table.h
include/table_builder.h
Lower-level modules that most clients probably won't use directly
Install
=======
Get up and running quickly:
$ autoreconf -i
$ ./configure
$ make
# make install
# ldconfig

View File

@@ -42,7 +42,8 @@
#include <po6/threads/thread.h>
// e
#include <e/guard.h>
#include <e/popt.h>
#include <e/time.h>
// armnod
#include <armnod.h>
@@ -50,17 +51,9 @@
// numbers
#include <numbers.h>
static long _done = 0;
static long _number = 1000000;
static long _threads = 1;
static struct poptOption _popts[] = {
{"number", 'n', POPT_ARG_LONG, &_number, 0,
"perform N operations against the database (default: 1000000)", "N"},
{"threads", 't', POPT_ARG_LONG, &_threads, 0,
"run the test with T concurrent threads (default: 1)", "T"},
POPT_TABLEEND
};
static void
backup_thread(leveldb::DB*,
numbers::throughput_latency_logger* tll);
static void
worker_thread(leveldb::DB*,
@@ -68,50 +61,55 @@ worker_thread(leveldb::DB*,
const armnod::argparser& k,
const armnod::argparser& v);
static long _done = 0;
static long _number = 1000000;
static long _threads = 1;
static long _backup = 0;
static long _write_buf = 64ULL * 1024ULL * 1024ULL;
static const char* _output = "benchmark.log";
static const char* _dir = ".";
int
main(int argc, const char* argv[])
{
e::argparser ap;
ap.autohelp();
ap.arg().name('n', "number")
.description("perform N operations against the database (default: 1000000)")
.metavar("N")
.as_long(&_number);
ap.arg().name('t', "threads")
.description("run the test with T concurrent threads (default: 1)")
.metavar("T")
.as_long(&_threads);
ap.arg().name('o', "output")
.description("output file for benchmark results (default: benchmark.log)")
.as_string(&_output);
ap.arg().name('d', "db-dir")
.description("directory for leveldb storage (default: .)")
.as_string(&_dir);
ap.arg().name('w', "write-buffer")
.description("write buffer size (default: 64MB)")
.as_long(&_write_buf);
ap.arg().name('b', "backup")
.description("perform a live backup every N seconds (default: 0 (no backup))")
.as_long(&_backup);
armnod::argparser key_parser("key-");
armnod::argparser value_parser("value-");
std::vector<poptOption> popts;
poptOption s[] = {POPT_AUTOHELP {NULL, 0, POPT_ARG_INCLUDE_TABLE, _popts, 0, "Benchmark:", NULL}, POPT_TABLEEND};
popts.push_back(s[0]);
popts.push_back(s[1]);
popts.push_back(key_parser.options("Key Generation:"));
popts.push_back(value_parser.options("Value Generation:"));
popts.push_back(s[2]);
poptContext poptcon;
poptcon = poptGetContext(NULL, argc, argv, &popts.front(), POPT_CONTEXT_POSIXMEHARDER);
e::guard g = e::makeguard(poptFreeContext, poptcon); g.use_variable();
poptSetOtherOptionHelp(poptcon, "[OPTIONS]");
ap.add("Key Generation:", key_parser.parser());
ap.add("Value Generation:", value_parser.parser());
int rc;
while ((rc = poptGetNextOpt(poptcon)) != -1)
if (!ap.parse(argc, argv))
{
switch (rc)
{
case POPT_ERROR_NOARG:
case POPT_ERROR_BADOPT:
case POPT_ERROR_BADNUMBER:
case POPT_ERROR_OVERFLOW:
std::cerr << poptStrerror(rc) << " " << poptBadOption(poptcon, 0) << std::endl;
return EXIT_FAILURE;
case POPT_ERROR_OPTSTOODEEP:
case POPT_ERROR_BADQUOTE:
case POPT_ERROR_ERRNO:
default:
std::cerr << "logic error in argument parsing" << std::endl;
return EXIT_FAILURE;
}
return EXIT_FAILURE;
}
leveldb::Options opts;
opts.create_if_missing = true;
opts.write_buffer_size = 64ULL * 1024ULL * 1024ULL;
opts.write_buffer_size = write_buf;
opts.filter_policy = leveldb::NewBloomFilterPolicy(10);
leveldb::DB* db;
leveldb::Status st = leveldb::DB::Open(opts, "tmp", &db);
leveldb::Status st = leveldb::DB::Open(opts, _dir, &db);
if (!st.ok())
{
@@ -121,7 +119,7 @@ main(int argc, const char* argv[])
numbers::throughput_latency_logger tll;
if (!tll.open("benchmark.log"))
if (!tll.open(_output))
{
std::cerr << "could not open log: " << strerror(errno) << std::endl;
return EXIT_FAILURE;
@@ -130,6 +128,13 @@ main(int argc, const char* argv[])
typedef std::tr1::shared_ptr<po6::threads::thread> thread_ptr;
std::vector<thread_ptr> threads;
if (_backup > 0)
{
thread_ptr t(new po6::threads::thread(std::tr1::bind(backup_thread, db, &tll)));
threads.push_back(t);
t->start();
}
for (size_t i = 0; i < _threads; ++i)
{
thread_ptr t(new po6::threads::thread(std::tr1::bind(worker_thread, db, &tll, key_parser, value_parser)));
@@ -175,6 +180,47 @@ get_random()
return ret;
}
#define BILLION (1000ULL * 1000ULL * 1000ULL)
void
backup_thread(leveldb::DB* db,
numbers::throughput_latency_logger* tll)
{
uint64_t target = e::time() / BILLION;
target += _backup;
uint64_t idx = 0;
numbers::throughput_latency_logger::thread_state ts;
tll->initialize_thread(&ts);
while (__sync_fetch_and_add(&_done, 0) < _number)
{
uint64_t now = e::time() / BILLION;
if (now < target)
{
timespec ts;
ts.tv_sec = 0;
ts.tv_nsec = 250ULL * 1000ULL * 1000ULL;
nanosleep(&ts, NULL);
}
else
{
target = now + _backup;
char buf[32];
snprintf(buf, 32, "%05lu", idx);
buf[31] = '\0';
leveldb::Slice name(buf);
leveldb::Status st;
tll->start(&ts, 4);
st = db->LiveBackup(name);
tll->finish(&ts);
assert(st.ok());
++idx;
}
}
}
void
worker_thread(leveldb::DB* db,
numbers::throughput_latency_logger* tll,
@@ -196,13 +242,15 @@ worker_thread(leveldb::DB* db,
// issue a "get"
std::string tmp;
leveldb::ReadOptions ropts;
tll->start(&ts, 1);
leveldb::Status rst = db->Get(ropts, leveldb::Slice(k.data(), k.size()), &tmp);
tll->finish(&ts);
assert(rst.ok() || rst.IsNotFound());
// issue a "put"
leveldb::WriteOptions wopts;
wopts.sync = false;
tll->start(&ts, 0);
tll->start(&ts, 2);
leveldb::Status wst = db->Put(wopts, leveldb::Slice(k.data(), k.size()), leveldb::Slice(v.data(), v.size()));
tll->finish(&ts);
assert(wst.ok());

View File

@@ -30,7 +30,7 @@
# Process this file with autoconf to produce a configure script.
AC_PREREQ([2.62])
AC_INIT([hyperleveldb], [2.0.dev], [robert@hyperdex.org])
AC_INIT([hyperleveldb], [1.0.dev], [robert@hyperdex.org])
AM_INIT_AUTOMAKE([foreign subdir-objects dist-bzip2])
m4_ifdef([AM_SILENT_RULES], [AM_SILENT_RULES([yes])])
LT_PREREQ([2.2])
@@ -58,6 +58,40 @@ AC_FUNC_MMAP
AC_CHECK_FUNCS([alarm clock_gettime mach_absolute_time ftruncate memmove mkdir munmap rmdir socket])
# Optional components
snappy_detect_hdr=yes
snappy_detect_lib=yes
AC_CHECK_LIB([snappy], [snappy_compress], [], [snappy_detect_hdr=no])
AC_CHECK_HEADER([snappy.h],,[snappy_detect_lib=no])
AC_ARG_ENABLE([snappy], [AS_HELP_STRING([--enable-snappy],
[build with Snappy @<:@default: auto@:>@])],
[snappy=${enableval}], [snappy=no])
if test x"${snappy}" = xyes; then
if test x"${snappy_detect_hdr}" != xyes; then
AC_MSG_ERROR([
-------------------------------------------------
LevelDB configured with the Snappy library.
libsnappy.so not found
Please install Snappy to continue.
-------------------------------------------------])
fi
if test x"${snappy_detect_lib}" != xyes; then
AC_MSG_ERROR([
-------------------------------------------------
LevelDB configured with the Snappy library.
snappy.h not found
Please install Snappy to continue.
-------------------------------------------------])
fi
fi
if test x"${snappy_detect_hdr}" = xyes -a x"${snappy_detect_lib}" = xyes; then
SNAPPY_FLAGS=-DSNAPPY
SNAPPY_LIBS=-lsnappy
else
SNAPPY_FLAGS=
SNAPPY_LIBS=
fi
AC_SUBST(SNAPPY_FLAGS)
AC_SUBST(SNAPPY_LIBS)
AC_CONFIG_FILES([Makefile libhyperleveldb.pc])
AC_CONFIG_FILES([Makefile libhyperleveldb.pc hyperleveldb.upack])
AC_OUTPUT

View File

@@ -0,0 +1,123 @@
// Copyright (c) 2013 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "hyperleveldb/db.h"
#include "db/db_impl.h"
#include "hyperleveldb/cache.h"
#include "util/testharness.h"
#include "util/testutil.h"
namespace leveldb {
class AutoCompactTest {
public:
std::string dbname_;
Cache* tiny_cache_;
Options options_;
DB* db_;
AutoCompactTest() {
dbname_ = test::TmpDir() + "/autocompact_test";
tiny_cache_ = NewLRUCache(100);
options_.block_cache = tiny_cache_;
DestroyDB(dbname_, options_);
options_.create_if_missing = true;
options_.compression = kNoCompression;
ASSERT_OK(DB::Open(options_, dbname_, &db_));
}
~AutoCompactTest() {
delete db_;
DestroyDB(dbname_, Options());
delete tiny_cache_;
}
std::string Key(int i) {
char buf[100];
snprintf(buf, sizeof(buf), "key%06d", i);
return std::string(buf);
}
uint64_t Size(const Slice& start, const Slice& limit) {
Range r(start, limit);
uint64_t size;
db_->GetApproximateSizes(&r, 1, &size);
return size;
}
void DoReads(int n);
};
static const int kValueSize = 200 * 1024;
static const int kTotalSize = 100 * 1024 * 1024;
static const int kCount = kTotalSize / kValueSize;
// Read through the first n keys repeatedly and check that they get
// compacted (verified by checking the size of the key space).
void AutoCompactTest::DoReads(int n) {
std::string value(kValueSize, 'x');
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
// Fill database
for (int i = 0; i < kCount; i++) {
ASSERT_OK(db_->Put(WriteOptions(), Key(i), value));
}
ASSERT_OK(dbi->TEST_CompactMemTable());
// Delete everything
for (int i = 0; i < kCount; i++) {
ASSERT_OK(db_->Delete(WriteOptions(), Key(i)));
}
ASSERT_OK(dbi->TEST_CompactMemTable());
// Get initial measurement of the space we will be reading.
const int64_t initial_size = Size(Key(0), Key(n));
const int64_t initial_other_size = Size(Key(n), Key(kCount));
// Read until size drops significantly.
std::string limit_key = Key(n);
for (int read = 0; true; read++) {
ASSERT_LT(read, 100) << "Taking too long to compact";
Iterator* iter = db_->NewIterator(ReadOptions());
for (iter->SeekToFirst();
iter->Valid() && iter->key().ToString() < limit_key;
iter->Next()) {
// Drop data
}
delete iter;
// Wait a little bit to allow any triggered compactions to complete.
Env::Default()->SleepForMicroseconds(1000000);
uint64_t size = Size(Key(0), Key(n));
fprintf(stderr, "iter %3d => %7.3f MB [other %7.3f MB]\n",
read+1, size/1048576.0, Size(Key(n), Key(kCount))/1048576.0);
if (size <= initial_size/10) {
break;
}
}
// Verify that the size of the key space not touched by the reads
// is pretty much unchanged.
const int64_t final_other_size = Size(Key(n), Key(kCount));
ASSERT_LE(final_other_size, initial_other_size + 1048576);
ASSERT_GE(final_other_size, initial_other_size/5 - 1048576);
}
TEST(AutoCompactTest, ReadAll) {
DoReads(kCount);
}
// HyperLevelDB's ratio-driven compactions always compact everything here. The
// reads trigger the compaction, but then the system decides it is more
// effiicient to just collect everything, emptying the db completely.
#if 0
TEST(AutoCompactTest, ReadHalf) {
DoReads(kCount/2);
}
#endif
} // namespace leveldb
int main(int argc, char** argv) {
return leveldb::test::RunAllTests();
}

View File

@@ -35,6 +35,7 @@ class CorruptionTest {
CorruptionTest() {
tiny_cache_ = NewLRUCache(100);
options_.env = &env_;
options_.block_cache = tiny_cache_;
dbname_ = test::TmpDir() + "/db_test";
DestroyDB(dbname_, options_);
@@ -50,17 +51,14 @@ class CorruptionTest {
delete tiny_cache_;
}
Status TryReopen(Options* options = NULL) {
Status TryReopen() {
delete db_;
db_ = NULL;
Options opt = (options ? *options : options_);
opt.env = &env_;
opt.block_cache = tiny_cache_;
return DB::Open(opt, dbname_, &db_);
return DB::Open(options_, dbname_, &db_);
}
void Reopen(Options* options = NULL) {
ASSERT_OK(TryReopen(options));
void Reopen() {
ASSERT_OK(TryReopen());
}
void RepairDB() {
@@ -92,6 +90,10 @@ class CorruptionTest {
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
uint64_t key;
Slice in(iter->key());
if (in == "" || in == "~") {
// Ignore boundary keys.
continue;
}
if (!ConsumeDecimalNumber(&in, &key) ||
!in.empty() ||
key < next_expected) {
@@ -233,7 +235,7 @@ TEST(CorruptionTest, TableFile) {
dbi->TEST_CompactRange(1, NULL, NULL);
Corrupt(kTableFile, 100, 1);
Check(99, 99);
Check(90, 99);
}
TEST(CorruptionTest, TableFileIndexData) {
@@ -299,7 +301,7 @@ TEST(CorruptionTest, CompactionInputError) {
ASSERT_EQ(1, Property("leveldb.num-files-at-level" + NumberToString(last)));
Corrupt(kTableFile, 100, 1);
Check(9, 9);
Check(5, 9);
// Force compactions by writing lots of values
Build(10000);
@@ -307,33 +309,23 @@ TEST(CorruptionTest, CompactionInputError) {
}
TEST(CorruptionTest, CompactionInputErrorParanoid) {
Options options;
options.paranoid_checks = true;
options.write_buffer_size = 1048576;
Reopen(&options);
options_.paranoid_checks = true;
options_.write_buffer_size = 512 << 10;
Reopen();
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
// Fill levels >= 1 so memtable compaction outputs to level 1
for (int level = 1; level < config::kNumLevels; level++) {
dbi->Put(WriteOptions(), "", "begin");
dbi->Put(WriteOptions(), "~", "end");
// Make multiple inputs so we need to compact.
for (int i = 0; i < 2; i++) {
Build(10);
dbi->TEST_CompactMemTable();
Corrupt(kTableFile, 100, 1);
env_.SleepForMicroseconds(100000);
}
dbi->CompactRange(NULL, NULL);
Build(10);
dbi->TEST_CompactMemTable();
env_.SleepForMicroseconds(1000000);
ASSERT_EQ(1, Property("leveldb.num-files-at-level0"));
Corrupt(kTableFile, 100, 1);
Check(9, 9);
// Write must eventually fail because of corrupted table
Status s;
// Write must fail because of corrupted table
std::string tmp1, tmp2;
for (int i = 0; i < 1000000 && s.ok(); i++) {
s = db_->Put(WriteOptions(), Key(i, &tmp1), Value(i, &tmp2));
}
Status s = db_->Put(WriteOptions(), Key(5, &tmp1), Value(5, &tmp2));
ASSERT_TRUE(!s.ok()) << "write did not fail in corrupted paranoid db";
}

View File

@@ -5,11 +5,13 @@
#include "db_impl.h"
#include <algorithm>
#include <iostream>
#include <set>
#include <string>
#include <stdint.h>
#include <stdio.h>
#include <vector>
#include "builder.h"
#include "db_iter.h"
#include "dbformat.h"
@@ -17,11 +19,13 @@
#include "log_reader.h"
#include "log_writer.h"
#include "memtable.h"
#include "replay_iterator.h"
#include "table_cache.h"
#include "version_set.h"
#include "write_batch_internal.h"
#include "../hyperleveldb/db.h"
#include "../hyperleveldb/env.h"
#include "../hyperleveldb/replay_iterator.h"
#include "../hyperleveldb/status.h"
#include "../hyperleveldb/table.h"
#include "../hyperleveldb/table_builder.h"
@@ -35,6 +39,8 @@
namespace hyperleveldb {
const int kStraightReads = 50;
const int kNumNonTableCacheFiles = 10;
struct DBImpl::CompactionState {
@@ -102,22 +108,23 @@ Options SanitizeOptions(const std::string& dbname,
return result;
}
DBImpl::DBImpl(const Options& options, const std::string& dbname)
: env_(options.env),
internal_comparator_(options.comparator),
internal_filter_policy_(options.filter_policy),
options_(SanitizeOptions(
dbname, &internal_comparator_, &internal_filter_policy_, options)),
owns_info_log_(options_.info_log != options.info_log),
owns_cache_(options_.block_cache != options.block_cache),
DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
: env_(raw_options.env),
internal_comparator_(raw_options.comparator),
internal_filter_policy_(raw_options.filter_policy),
options_(SanitizeOptions(dbname, &internal_comparator_,
&internal_filter_policy_, raw_options)),
owns_info_log_(options_.info_log != raw_options.info_log),
owns_cache_(options_.block_cache != raw_options.block_cache),
dbname_(dbname),
db_lock_(NULL),
shutting_down_(NULL),
mem_(new MemTable(internal_comparator_)),
imm_(NULL),
logfile_(NULL),
logfile_(),
logfile_number_(0),
log_(NULL),
log_(),
seed_(0),
writers_lower_(0),
writers_upper_(0),
bg_fg_cv_(&mutex_),
@@ -130,17 +137,24 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
bg_log_cv_(&mutex_),
bg_log_occupied_(false),
manual_compaction_(NULL),
manual_garbage_cutoff_(raw_options.manual_garbage_collection ?
SequenceNumber(0) : kMaxSequenceNumber),
straight_reads_(0),
backup_cv_(&mutex_),
backup_in_progress_(),
backup_deferred_delete_(),
consecutive_compaction_errors_(0) {
mutex_.Lock();
mem_->Ref();
has_imm_.Release_Store(NULL);
backup_in_progress_.Release_Store(NULL);
env_->StartThread(&DBImpl::CompactMemTableWrapper, this);
env_->StartThread(&DBImpl::CompactOptimisticWrapper, this);
env_->StartThread(&DBImpl::CompactLevelWrapper, this);
num_bg_threads_ = 3;
// Reserve ten files or so for other uses and give the rest to TableCache.
const int table_cache_size = options.max_open_files - kNumNonTableCacheFiles;
const int table_cache_size = options_.max_open_files - kNumNonTableCacheFiles;
table_cache_ = new TableCache(dbname_, &options_, table_cache_size);
versions_ = new VersionSet(dbname_, &options_, table_cache_,
&internal_comparator_);
@@ -170,8 +184,8 @@ DBImpl::~DBImpl() {
delete versions_;
if (mem_ != NULL) mem_->Unref();
if (imm_ != NULL) imm_->Unref();
delete log_;
delete logfile_;
log_.reset();
logfile_.reset();
delete table_cache_;
if (owns_info_log_) {
@@ -224,6 +238,16 @@ void DBImpl::MaybeIgnoreError(Status* s) const {
}
void DBImpl::DeleteObsoleteFiles() {
// Defer if there's background activity
mutex_.AssertHeld();
if (backup_in_progress_.Acquire_Load() != NULL) {
backup_deferred_delete_ = true;
return;
}
// If you ever release mutex_ in this function, you'll need to do more work in
// LiveBackup
// Make a set of all of the live files
std::set<uint64_t> live = pending_outputs_;
versions_->AddLiveFiles(&live);
@@ -651,7 +675,7 @@ void DBImpl::CompactLevelThread() {
while (!shutting_down_.Acquire_Load()) {
while (!shutting_down_.Acquire_Load() &&
manual_compaction_ == NULL &&
!versions_->NeedsCompaction(levels_locked_)) {
!versions_->NeedsCompaction(levels_locked_, straight_reads_ > kStraightReads)) {
bg_compaction_cv_.Wait();
}
if (shutting_down_.Acquire_Load()) {
@@ -709,9 +733,9 @@ Status DBImpl::BackgroundCompaction() {
(m->end ? m->end->DebugString().c_str() : "(end)"),
(m->done ? "(end)" : manual_end.DebugString().c_str()));
} else {
int level = versions_->PickCompactionLevel(levels_locked_);
int level = versions_->PickCompactionLevel(levels_locked_, straight_reads_ > kStraightReads);
if (level != config::kNumLevels) {
c = versions_->PickCompaction(level);
c = versions_->PickCompaction(versions_->current(), level);
}
if (c) {
assert(!levels_locked_[c->level() + 0]);
@@ -831,7 +855,7 @@ Status DBImpl::OptimisticCompaction() {
if (levels_locked_[level] || levels_locked_[level + 1]) {
continue;
}
Compaction* tmp = versions_->PickCompaction(level);
Compaction* tmp = versions_->PickCompaction(versions_->current(), level);
if (tmp && tmp->IsTrivialMove()) {
if (c) {
delete c;
@@ -1055,6 +1079,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
std::string current_user_key;
bool has_current_user_key = false;
SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
uint64_t i = 0;
for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
Slice key = input->key();
// Handle key/value, add to state, etc.
@@ -1074,6 +1099,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
last_sequence_for_key = kMaxSequenceNumber;
}
// Just remember that last_sequence_for_key is decreasing over time, and
// all of this makes sense.
if (last_sequence_for_key <= compact->smallest_snapshot) {
// Hidden by an newer entry for same user key
drop = true; // (A)
@@ -1090,6 +1118,15 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
drop = true;
}
// If we're going to drop this key, and there was no previous version of
// this key, and it was written at or after the garbage cutoff, we keep
// it.
if (drop &&
last_sequence_for_key == kMaxSequenceNumber &&
ikey.sequence >= manual_garbage_cutoff_) {
drop = false;
}
last_sequence_for_key = ikey.sequence;
}
@@ -1174,10 +1211,14 @@ static void CleanupIteratorState(void* arg1, void* arg2) {
}
} // namespace
Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
SequenceNumber* latest_snapshot) {
Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, uint64_t number,
SequenceNumber* latest_snapshot,
uint32_t* seed, bool external_sync) {
IterState* cleanup = new IterState;
mutex_.Lock();
if (!external_sync) {
mutex_.Lock();
}
++straight_reads_;
*latest_snapshot = versions_->LastSequence();
// Collect together all needed child iterators
@@ -1188,7 +1229,7 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
list.push_back(imm_->NewIterator());
imm_->Ref();
}
versions_->current()->AddIterators(options, &list);
versions_->current()->AddSomeIterators(options, number, &list);
Iterator* internal_iter =
NewMergingIterator(&internal_comparator_, &list[0], list.size());
versions_->current()->Ref();
@@ -1199,13 +1240,17 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
cleanup->version = versions_->current();
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL);
mutex_.Unlock();
*seed = ++seed_;
if (!external_sync) {
mutex_.Unlock();
}
return internal_iter;
}
Iterator* DBImpl::TEST_NewInternalIterator() {
SequenceNumber ignored;
return NewInternalIterator(ReadOptions(), &ignored);
uint32_t ignored_seed;
return NewInternalIterator(ReadOptions(), 0, &ignored, &ignored_seed, false);
}
int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
@@ -1251,6 +1296,10 @@ Status DBImpl::Get(const ReadOptions& options,
mutex_.Lock();
}
if (have_stat_update && current->UpdateStats(stats)) {
bg_compaction_cv_.Signal();
}
++straight_reads_;
mem->Unref();
if (imm != NULL) imm->Unref();
current->Unref();
@@ -1259,12 +1308,157 @@ Status DBImpl::Get(const ReadOptions& options,
Iterator* DBImpl::NewIterator(const ReadOptions& options) {
SequenceNumber latest_snapshot;
Iterator* internal_iter = NewInternalIterator(options, &latest_snapshot);
uint32_t seed;
Iterator* iter = NewInternalIterator(options, 0, &latest_snapshot, &seed, false);
return NewDBIterator(
&dbname_, env_, user_comparator(), internal_iter,
this, user_comparator(), iter,
(options.snapshot != NULL
? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
: latest_snapshot));
: latest_snapshot),
seed);
}
void DBImpl::GetReplayTimestamp(std::string* timestamp) {
uint64_t file = 0;
uint64_t seqno = 0;
{
MutexLock l(&mutex_);
file = versions_->NewFileNumber();
versions_->ReuseFileNumber(file);
seqno = versions_->LastSequence();
}
timestamp->clear();
PutVarint64(timestamp, file);
PutVarint64(timestamp, seqno);
}
void DBImpl::AllowGarbageCollectBeforeTimestamp(const std::string& timestamp) {
Slice ts_slice(timestamp);
uint64_t file = 0;
uint64_t seqno = 0;
if (timestamp == "all") {
// keep zeroes
} else if (timestamp == "now") {
MutexLock l(&mutex_);
seqno = versions_->LastSequence();
if (manual_garbage_cutoff_ < seqno) {
manual_garbage_cutoff_ = seqno;
}
} else if (GetVarint64(&ts_slice, &file) &&
GetVarint64(&ts_slice, &seqno)) {
MutexLock l(&mutex_);
if (manual_garbage_cutoff_ < seqno) {
manual_garbage_cutoff_ = seqno;
}
}
}
bool DBImpl::ValidateTimestamp(const std::string& ts) {
uint64_t file = 0;
uint64_t seqno = 0;
Slice ts_slice(ts);
return ts == "all" || ts == "now" ||
(GetVarint64(&ts_slice, &file) &&
GetVarint64(&ts_slice, &seqno));
}
int DBImpl::CompareTimestamps(const std::string& lhs, const std::string& rhs) {
uint64_t now = 0;
uint64_t lhs_seqno = 0;
uint64_t rhs_seqno = 0;
uint64_t tmp;
if (lhs == "now" || rhs == "now") {
MutexLock l(&mutex_);
now = versions_->LastSequence();
}
if (lhs == "all") {
lhs_seqno = 0;
} else if (lhs == "now") {
lhs_seqno = now;
} else {
Slice lhs_slice(lhs);
GetVarint64(&lhs_slice, &tmp);
GetVarint64(&lhs_slice, &lhs_seqno);
}
if (rhs == "all") {
rhs_seqno = 0;
} else if (rhs == "now") {
rhs_seqno = now;
} else {
Slice rhs_slice(rhs);
GetVarint64(&rhs_slice, &tmp);
GetVarint64(&rhs_slice, &rhs_seqno);
}
if (lhs_seqno < rhs_seqno) {
return -1;
} else if (lhs_seqno > rhs_seqno) {
return 1;
} else {
return 0;
}
}
Status DBImpl::GetReplayIterator(const std::string& timestamp,
ReplayIterator** iter) {
*iter = NULL;
Slice ts_slice(timestamp);
uint64_t file = 0;
uint64_t seqno = 0;
if (timestamp == "all") {
seqno = 0;
} else if (timestamp == "now") {
MutexLock l(&mutex_);
file = versions_->NewFileNumber();
versions_->ReuseFileNumber(file);
seqno = versions_->LastSequence();
} else if (!GetVarint64(&ts_slice, &file) ||
!GetVarint64(&ts_slice, &seqno)) {
return Status::InvalidArgument("Timestamp is not valid");
}
ReadOptions options;
SequenceNumber latest_snapshot;
uint32_t seed;
MutexLock l(&mutex_);
Iterator* internal_iter = NewInternalIterator(options, file, &latest_snapshot, &seed, true);
internal_iter->SeekToFirst();
ReplayIteratorImpl* iterimpl;
iterimpl = new ReplayIteratorImpl(
this, &mutex_, user_comparator(), internal_iter, mem_, SequenceNumber(seqno));
*iter = iterimpl;
replay_iters_.push_back(iterimpl);
return Status::OK();
}
void DBImpl::ReleaseReplayIterator(ReplayIterator* _iter) {
MutexLock l(&mutex_);
ReplayIteratorImpl* iter = reinterpret_cast<ReplayIteratorImpl*>(_iter);
for (std::list<ReplayIteratorImpl*>::iterator it = replay_iters_.begin();
it != replay_iters_.end(); ++it) {
if (*it == iter) {
iter->cleanup(); // calls delete
replay_iters_.erase(it);
return;
}
}
}
void DBImpl::RecordReadSample(Slice key) {
MutexLock l(&mutex_);
++straight_reads_;
if (versions_->current()->RecordReadSample(key)) {
bg_compaction_cv_.Signal();
}
}
SequenceNumber DBImpl::LastSequence() {
MutexLock l(&mutex_);
return versions_->LastSequence();
}
const Snapshot* DBImpl::GetSnapshot() {
@@ -1294,11 +1488,11 @@ struct DBImpl::Writer {
Writer* next;
uint64_t start_sequence;
uint64_t end_sequence;
WritableFile* logfile;
log::Writer* log;
std::tr1::shared_ptr<WritableFile> logfile;
std::tr1::shared_ptr<log::Writer> log;
MemTable* mem;
WritableFile* old_logfile;
log::Writer* old_log;
std::tr1::shared_ptr<WritableFile> old_logfile;
std::tr1::shared_ptr<log::Writer> old_log;
explicit Writer()
: mtx(),
@@ -1307,11 +1501,11 @@ struct DBImpl::Writer {
next(NULL),
start_sequence(0),
end_sequence(0),
logfile(NULL),
log(NULL),
logfile(),
log(),
mem(NULL),
old_logfile(NULL),
old_log(NULL) {
old_logfile(),
old_log() {
}
~Writer() throw () {
}
@@ -1345,10 +1539,12 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Status DBImpl::SequenceWriteBegin(Writer* w, WriteBatch* updates) {
Status s;
MutexLock l(&mutex_);
straight_reads_ = 0;
bool force = updates == NULL;
bool allow_delay = !force;
w->old_log = NULL;
w->old_logfile = NULL;
bool enqueue_mem = false;
w->old_log.reset();
w->old_logfile.reset();
while (true) {
if (!bg_error_.ok()) {
@@ -1380,14 +1576,15 @@ Status DBImpl::SequenceWriteBegin(Writer* w, WriteBatch* updates) {
}
w->old_log = log_;
w->old_logfile = logfile_;
logfile_ = lfile;
logfile_.reset(lfile);
logfile_number_ = new_log_number;
log_ = new log::Writer(lfile);
log_.reset(new log::Writer(lfile));
imm_ = mem_;
has_imm_.Release_Store(imm_);
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
force = false; // Do not force another compaction if have room
enqueue_mem = true;
break;
}
}
@@ -1405,6 +1602,13 @@ Status DBImpl::SequenceWriteBegin(Writer* w, WriteBatch* updates) {
w->mem->Ref();
}
if (enqueue_mem) {
for (std::list<ReplayIteratorImpl*>::iterator it = replay_iters_.begin();
it != replay_iters_.end(); ++it) {
(*it)->enqueue(mem_, w->start_sequence);
}
}
return s;
}
@@ -1429,8 +1633,8 @@ void DBImpl::SequenceWriteEnd(Writer* w) {
// must do in order: log, logfile
if (w->old_log) {
assert(w->old_logfile);
delete w->old_log;
delete w->old_logfile;
w->old_log.reset();
w->old_logfile.reset();
bg_memtable_cv_.Signal();
}
@@ -1520,6 +1724,107 @@ void DBImpl::GetApproximateSizes(
}
}
Status DBImpl::LiveBackup(const Slice& _name) {
Slice name = _name;
size_t name_sz = 0;
for (; name_sz < name.size() && name.data()[name_sz] != '\0'; ++name_sz)
;
name = Slice(name.data(), name_sz);
std::set<uint64_t> live;
uint64_t ticket = __sync_add_and_fetch(&writers_upper_, 1);
while (__sync_fetch_and_add(&writers_lower_, 0) < ticket)
;
{
MutexLock l(&mutex_);
versions_->SetLastSequence(ticket);
while (backup_in_progress_.Acquire_Load() != NULL) {
backup_cv_.Wait();
}
backup_in_progress_.Release_Store(this);
while (bg_log_occupied_) {
bg_log_cv_.Wait();
}
bg_log_occupied_ = true;
// note that this logic assumes that DeleteObsoleteFiles never releases
// mutex_, so that once we release at this brace, we'll guarantee that it
// will see backup_in_progress_. If you change DeleteObsoleteFiles to
// release mutex_, you'll need to add some sort of synchronization in place
// of this text block.
versions_->AddLiveFiles(&live);
__sync_fetch_and_add(&writers_lower_, 1);
}
Status s;
std::vector<std::string> filenames;
s = env_->GetChildren(dbname_, &filenames);
std::string backup_dir = dbname_ + "/backup-" + name.ToString() + "/";
if (s.ok()) {
s = env_->CreateDir(backup_dir);
}
uint64_t number;
FileType type;
for (size_t i = 0; i < filenames.size(); i++) {
if (!s.ok()) {
continue;
}
if (ParseFileName(filenames[i], &number, &type)) {
std::string src = dbname_ + "/" + filenames[i];
std::string target = backup_dir + "/" + filenames[i];
switch (type) {
case kLogFile:
case kDescriptorFile:
case kCurrentFile:
case kInfoLogFile:
s = env_->CopyFile(src, target);
break;
case kTableFile:
// If it's a file referenced by a version, we have logged that version
// and applied it. Our MANIFEST will reflect that, and the file
// number assigned to new files will be greater or equal, ensuring
// that they aren't overwritten. Any file not in "live" either exists
// past the current manifest (output of ongoing compaction) or so far
// in the past we don't care (we're going to delete it at the end of
// this backup). I'd rather play safe than sorry.
//
// Under no circumstances should you collapse this to a single
// LinkFile without the conditional as it has implications for backups
// that share hardlinks. Opening an older backup that has files
// hardlinked with newer backups will overwrite "immutable" files in
// the newer backups because they aren't in our manifest, and we do an
// open/write rather than a creat/rename. We avoid linking these
// files.
if (live.find(number) != live.end()) {
s = env_->LinkFile(src, target);
}
break;
case kTempFile:
case kDBLockFile:
break;
}
}
}
{
MutexLock l(&mutex_);
backup_in_progress_.Release_Store(NULL);
if (s.ok() && backup_deferred_delete_) {
DeleteObsoleteFiles();
}
backup_deferred_delete_ = false;
bg_log_occupied_ = false;
bg_log_cv_.Signal();
backup_cv_.Signal();
}
return s;
}
// Default implementations of convenience methods that subclasses of DB
// can call if they wish
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
@@ -1551,9 +1856,9 @@ Status DB::Open(const Options& options, const std::string& dbname,
&lfile);
if (s.ok()) {
edit.SetLogNumber(new_log_number);
impl->logfile_ = lfile;
impl->logfile_.reset(lfile);
impl->logfile_number_ = new_log_number;
impl->log_ = new log::Writer(lfile);
impl->log_.reset(new log::Writer(lfile));
s = impl->versions_->LogAndApply(&edit, &impl->mutex_, &impl->bg_log_cv_, &impl->bg_log_occupied_);
}
if (s.ok()) {

View File

@@ -6,10 +6,14 @@
#define STORAGE_HYPERLEVELDB_DB_DB_IMPL_H_
#include <deque>
#include <list>
#include <set>
#include <tr1/memory>
#include "dbformat.h"
#include "log_writer.h"
#include "snapshot.h"
#include "replay_iterator.h"
#include "../hyperleveldb/db.h"
#include "../hyperleveldb/env.h"
#include "../port/port.h"
@@ -36,11 +40,19 @@ class DBImpl : public DB {
const Slice& key,
std::string* value);
virtual Iterator* NewIterator(const ReadOptions&);
virtual void GetReplayTimestamp(std::string* timestamp);
virtual void AllowGarbageCollectBeforeTimestamp(const std::string& timestamp);
virtual bool ValidateTimestamp(const std::string& timestamp);
virtual int CompareTimestamps(const std::string& lhs, const std::string& rhs);
virtual Status GetReplayIterator(const std::string& timestamp,
ReplayIterator** iter);
virtual void ReleaseReplayIterator(ReplayIterator* iter);
virtual const Snapshot* GetSnapshot();
virtual void ReleaseSnapshot(const Snapshot* snapshot);
virtual bool GetProperty(const Slice& property, std::string* value);
virtual void GetApproximateSizes(const Range* range, int n, uint64_t* sizes);
virtual void CompactRange(const Slice* begin, const Slice* end);
virtual Status LiveBackup(const Slice& name);
// Extra methods (for testing) that are not in the public DB interface
@@ -59,13 +71,23 @@ class DBImpl : public DB {
// file at a level >= 1.
int64_t TEST_MaxNextLevelOverlappingBytes();
// Record a sample of bytes read at the specified internal key.
// Samples are taken approximately once every config::kReadBytesPeriod
// bytes.
void RecordReadSample(Slice key);
// Peek at the last sequence;
// REQURES: mutex_ not held
SequenceNumber LastSequence();
private:
friend class DB;
struct CompactionState;
struct Writer;
Iterator* NewInternalIterator(const ReadOptions&,
SequenceNumber* latest_snapshot);
Iterator* NewInternalIterator(const ReadOptions&, uint64_t number,
SequenceNumber* latest_snapshot,
uint32_t* seed, bool external_sync);
Status NewDB();
@@ -139,9 +161,10 @@ class DBImpl : public DB {
MemTable* mem_;
MemTable* imm_; // Memtable being compacted
port::AtomicPointer has_imm_; // So bg thread can detect non-NULL imm_
WritableFile* logfile_;
std::tr1::shared_ptr<WritableFile> logfile_;
uint64_t logfile_number_;
log::Writer* log_;
std::tr1::shared_ptr<log::Writer> log_;
uint32_t seed_; // For sampling.
// Synchronize writers
uint64_t __attribute__ ((aligned (8))) writers_lower_;
@@ -179,8 +202,22 @@ class DBImpl : public DB {
};
ManualCompaction* manual_compaction_;
// Where have we pinned tombstones?
SequenceNumber manual_garbage_cutoff_;
// replay iterators
std::list<ReplayIteratorImpl*> replay_iters_;
// how many reads have we done in a row, uninterrupted by writes
uint64_t straight_reads_;
VersionSet* versions_;
// Information for ongoing backup processes
port::CondVar backup_cv_;
port::AtomicPointer backup_in_progress_; // non-NULL in progress
bool backup_deferred_delete_; // DeleteObsoleteFiles delayed by backup; protect with mutex_
// Have we encountered a background error in paranoid mode?
Status bg_error_;
int consecutive_compaction_errors_;

View File

@@ -5,12 +5,14 @@
#include "db_iter.h"
#include "filename.h"
#include "db_impl.h"
#include "dbformat.h"
#include "../hyperleveldb/env.h"
#include "../hyperleveldb/iterator.h"
#include "../port/port.h"
#include "../util/logging.h"
#include "../util/mutexlock.h"
#include "../util/random.h"
namespace hyperleveldb {
@@ -46,15 +48,16 @@ class DBIter: public Iterator {
kReverse
};
DBIter(const std::string* dbname, Env* env,
const Comparator* cmp, Iterator* iter, SequenceNumber s)
: dbname_(dbname),
env_(env),
DBIter(DBImpl* db, const Comparator* cmp, Iterator* iter, SequenceNumber s,
uint32_t seed)
: db_(db),
user_comparator_(cmp),
iter_(iter),
sequence_(s),
direction_(kForward),
valid_(false) {
valid_(false),
rnd_(seed),
bytes_counter_(RandomPeriod()) {
}
virtual ~DBIter() {
delete iter_;
@@ -100,8 +103,12 @@ class DBIter: public Iterator {
}
}
const std::string* const dbname_;
Env* const env_;
// Pick next gap with average value of config::kReadBytesPeriod.
ssize_t RandomPeriod() {
return rnd_.Uniform(2*config::kReadBytesPeriod);
}
DBImpl* db_;
const Comparator* const user_comparator_;
Iterator* const iter_;
SequenceNumber const sequence_;
@@ -112,13 +119,23 @@ class DBIter: public Iterator {
Direction direction_;
bool valid_;
Random rnd_;
ssize_t bytes_counter_;
// No copying allowed
DBIter(const DBIter&);
void operator=(const DBIter&);
};
inline bool DBIter::ParseKey(ParsedInternalKey* ikey) {
if (!ParseInternalKey(iter_->key(), ikey)) {
Slice k = iter_->key();
ssize_t n = k.size() + iter_->value().size();
bytes_counter_ -= n;
while (bytes_counter_ < 0) {
bytes_counter_ += RandomPeriod();
db_->RecordReadSample(k);
}
if (!ParseInternalKey(k, ikey)) {
status_ = Status::Corruption("corrupted internal key in DBIter");
return false;
} else {
@@ -129,6 +146,9 @@ inline bool DBIter::ParseKey(ParsedInternalKey* ikey) {
void DBIter::Next() {
assert(valid_);
// Temporarily use saved_key_ as storage for key to skip.
std::string* skip = &saved_key_;
if (direction_ == kReverse) { // Switch directions?
direction_ = kForward;
// iter_ is pointing just before the entries for this->key(),
@@ -144,11 +164,13 @@ void DBIter::Next() {
saved_key_.clear();
return;
}
} else {
// if we did not reverse direction, save the current key as the one which we
// must advance past. If we did reverse direction, it's already been set,
// so we don't need to do it. This conditional solves upstream issue #200.
SaveKey(ExtractUserKey(iter_->key()), skip);
}
// Temporarily use saved_key_ as storage for key to skip.
std::string* skip = &saved_key_;
SaveKey(ExtractUserKey(iter_->key()), skip);
FindNextUserEntry(true, skip);
}
@@ -288,12 +310,12 @@ void DBIter::SeekToLast() {
} // anonymous namespace
Iterator* NewDBIterator(
const std::string* dbname,
Env* env,
DBImpl* db,
const Comparator* user_key_comparator,
Iterator* internal_iter,
const SequenceNumber& sequence) {
return new DBIter(dbname, env, user_key_comparator, internal_iter, sequence);
SequenceNumber sequence,
uint32_t seed) {
return new DBIter(db, user_key_comparator, internal_iter, sequence, seed);
}
} // namespace hyperleveldb

View File

@@ -11,15 +11,17 @@
namespace hyperleveldb {
class DBImpl;
// Return a new iterator that converts internal keys (yielded by
// "*internal_iter") that were live at the specified "sequence" number
// into appropriate user keys.
extern Iterator* NewDBIterator(
const std::string* dbname,
Env* env,
DBImpl* db,
const Comparator* user_key_comparator,
Iterator* internal_iter,
const SequenceNumber& sequence);
SequenceNumber sequence,
uint32_t seed);
} // namespace hyperleveldb

View File

@@ -614,6 +614,47 @@ TEST(DBTest, GetPicksCorrectFile) {
} while (ChangeOptions());
}
#if 0
TEST(DBTest, GetEncountersEmptyLevel) {
do {
// Arrange for the following to happen:
// * sstable A in level 0
// * nothing in level 1
// * sstable B in level 2
// Then do enough Get() calls to arrange for an automatic compaction
// of sstable A. A bug would cause the compaction to be marked as
// occuring at level 1 (instead of the correct level 0).
// Step 1: First place sstables in levels 0 and 2
int compaction_count = 0;
while (NumTableFilesAtLevel(0) == 0 ||
NumTableFilesAtLevel(2) == 0) {
ASSERT_LE(compaction_count, 100) << "could not fill levels 0 and 2";
compaction_count++;
Put("a", "begin");
Put("z", "end");
dbfull()->TEST_CompactMemTable();
}
// Step 2: clear level 1 if necessary.
dbfull()->TEST_CompactRange(1, NULL, NULL);
ASSERT_EQ(NumTableFilesAtLevel(0), 1);
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
ASSERT_EQ(NumTableFilesAtLevel(2), 1);
// Step 3: read a bunch of times
for (int i = 0; i < 1000; i++) {
ASSERT_EQ("NOT_FOUND", Get("missing"));
}
// Step 4: Wait for compaction to finish
env_->SleepForMicroseconds(1000000);
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
} while (ChangeOptions());
}
#endif
TEST(DBTest, IterEmpty) {
Iterator* iter = db_->NewIterator(ReadOptions());
@@ -1793,6 +1834,23 @@ class ModelDB: public DB {
return new ModelIter(snapshot_state, false);
}
}
virtual void GetReplayTimestamp(std::string* timestamp) {
}
virtual void AllowGarbageCollectBeforeTimestamp(const std::string& timestamp) {
}
virtual bool ValidateTimestamp(const std::string&) {
return false;
}
virtual int CompareTimestamps(const std::string&, const std::string&) {
return 0;
}
virtual Status GetReplayIterator(const std::string& timestamp,
ReplayIterator** iter) {
*iter = NULL;
return Status::OK();
}
virtual void ReleaseReplayIterator(ReplayIterator* iter) {
}
virtual const Snapshot* GetSnapshot() {
ModelSnapshot* snapshot = new ModelSnapshot;
snapshot->map_ = map_;
@@ -1828,6 +1886,8 @@ class ModelDB: public DB {
}
virtual void CompactRange(const Slice* start, const Slice* end) {
}
virtual Status LiveBackup(const Slice& name) {
}
private:
class ModelIter: public Iterator {
@@ -1990,6 +2050,71 @@ TEST(DBTest, Randomized) {
} while (ChangeOptions());
}
TEST(DBTest, Replay) {
std::string ts;
db_->GetReplayTimestamp(&ts);
ASSERT_OK(Put("key", "v0"));
ASSERT_OK(Put("key", "v1"));
ASSERT_OK(Put("key", "v2"));
ASSERT_OK(Put("key", "v3"));
ASSERT_OK(Put("key", "v4"));
ASSERT_OK(Put("key", "v5"));
ASSERT_OK(Put("key", "v6"));
ASSERT_OK(Put("key", "v7"));
ASSERT_OK(Put("key", "v8"));
ASSERT_OK(Put("key", "v9"));
// get the iterator
ReplayIterator* iter = NULL;
ASSERT_OK(db_->GetReplayIterator(ts, &iter));
// Iterate over what was there to start with
ASSERT_TRUE(iter->Valid());
ASSERT_TRUE(iter->HasValue());
ASSERT_EQ("key", iter->key().ToString());
ASSERT_EQ("v9", iter->value().ToString());
iter->Next();
// The implementation is allowed to return things twice.
// This is a case where it will.
ASSERT_TRUE(iter->Valid());
ASSERT_TRUE(iter->HasValue());
ASSERT_EQ("key", iter->key().ToString());
ASSERT_EQ("v9", iter->value().ToString());
iter->Next();
// Now it's no longer valid.
ASSERT_TRUE(!iter->Valid());
ASSERT_TRUE(!iter->Valid());
// Add another and iterate some more
ASSERT_OK(Put("key", "v10"));
ASSERT_TRUE(iter->Valid());
ASSERT_TRUE(iter->HasValue());
ASSERT_EQ("key", iter->key().ToString());
ASSERT_EQ("v10", iter->value().ToString());
iter->Next();
ASSERT_TRUE(!iter->Valid());
// Dump the memtable
dbfull()->TEST_CompactMemTable();
// Write into the new MemTable and iterate some more
ASSERT_OK(Put("key", "v11"));
ASSERT_TRUE(iter->Valid());
ASSERT_TRUE(iter->HasValue());
ASSERT_EQ("key", iter->key().ToString());
ASSERT_EQ("v11", iter->value().ToString());
iter->Next();
ASSERT_TRUE(!iter->Valid());
// What does it do on delete?
ASSERT_OK(Delete("key"));
ASSERT_TRUE(iter->Valid());
ASSERT_TRUE(!iter->HasValue());
ASSERT_EQ("key", iter->key().ToString());
iter->Next();
ASSERT_TRUE(!iter->Valid());
}
std::string MakeKey(unsigned int num) {
char buf[30];
snprintf(buf, sizeof(buf), "%016u", num);

View File

@@ -40,6 +40,9 @@ static const int kL0_StopWritesTrigger = 12;
// space if the same key space is being repeatedly overwritten.
static const int kMaxMemCompactLevel = 2;
// Approximate gap in bytes between samples of data read during iteration.
static const int kReadBytesPeriod = 1048576;
} // namespace config
class InternalKey;

View File

@@ -19,6 +19,10 @@ static Slice GetLengthPrefixedSlice(const char* data) {
return Slice(p, len);
}
static Slice GetLengthPrefixedSlice(std::pair<uint64_t, const char*> tk) {
return GetLengthPrefixedSlice(tk.second);
}
MemTable::MemTable(const InternalKeyComparator& cmp)
: comparator_(cmp),
refs_(0),
@@ -34,11 +38,16 @@ size_t MemTable::ApproximateMemoryUsage() {
return arena_.MemoryUsage();
}
int MemTable::KeyComparator::operator()(const char* aptr, const char* bptr)
int MemTable::KeyComparator::operator()(TableKey ak, TableKey bk)
const {
if (ak.first < bk.first) {
return -1;
} else if (ak.first > bk.first) {
return 1;
}
// Internal keys are encoded as length-prefixed strings.
Slice a = GetLengthPrefixedSlice(aptr);
Slice b = GetLengthPrefixedSlice(bptr);
Slice a = GetLengthPrefixedSlice(ak);
Slice b = GetLengthPrefixedSlice(bk);
return comparator.Compare(a, b);
}
@@ -54,10 +63,15 @@ static const char* EncodeKey(std::string* scratch, const Slice& target) {
class MemTableIterator: public Iterator {
public:
explicit MemTableIterator(MemTable::Table* table) : iter_(table) { }
explicit MemTableIterator(MemTable::Table* table,
MemTable::KeyComparator* cmp)
: iter_(table), comparator_(cmp) { }
virtual bool Valid() const { return iter_.Valid(); }
virtual void Seek(const Slice& k) { iter_.Seek(EncodeKey(&tmp_, k)); }
virtual void Seek(const Slice& k) {
uint64_t keynum = comparator_->comparator.user_comparator()->KeyNum(Slice(k.data(), k.size() - 8));
iter_.Seek(std::make_pair(keynum, EncodeKey(&tmp_, k)));
}
virtual void SeekToFirst() { iter_.SeekToFirst(); }
virtual void SeekToLast() { iter_.SeekToLast(); }
virtual void Next() { iter_.Next(); }
@@ -72,6 +86,7 @@ class MemTableIterator: public Iterator {
private:
MemTable::Table::Iterator iter_;
MemTable::KeyComparator* comparator_;
std::string tmp_; // For passing to EncodeKey
// No copying allowed
@@ -80,7 +95,7 @@ class MemTableIterator: public Iterator {
};
Iterator* MemTable::NewIterator() {
return new MemTableIterator(&table_);
return new MemTableIterator(&table_, &comparator_);
}
void MemTable::Add(SequenceNumber s, ValueType type,
@@ -112,18 +127,22 @@ void MemTable::Add(SequenceNumber s, ValueType type,
p = EncodeVarint32(p, val_size);
memcpy(p, value.data(), val_size);
assert((p + val_size) - buf == encoded_len);
Table::InsertHint ih(&table_, buf);
uint64_t keynum = comparator_.comparator.user_comparator()->KeyNum(key);
TableKey tk(keynum, buf);
Table::InsertHint ih(&table_, tk);
{
MutexLock l(&mtx_);
table_.InsertWithHint(&ih, buf);
table_.InsertWithHint(&ih, tk);
}
}
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
Slice memkey = key.memtable_key();
Table::Iterator iter(&table_);
iter.Seek(memkey.data());
uint64_t keynum = comparator_.comparator.user_comparator()->KeyNum(key.user_key());
TableKey tk(keynum, memkey.data());
iter.Seek(tk);
if (iter.Valid()) {
// entry format is:
// klength varint32
@@ -134,10 +153,11 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
// Check that it belongs to same user key. We do not check the
// sequence number since the Seek() call above should have skipped
// all entries with overly large sequence numbers.
const char* entry = iter.key();
const char* entry = iter.key().second;
uint32_t key_length;
const char* key_ptr = GetVarint32Ptr(entry, entry+5, &key_length);
if (comparator_.comparator.user_comparator()->Compare(
if (iter.key().first == tk.first &&
comparator_.comparator.user_comparator()->Compare(
Slice(key_ptr, key_length - 8),
key.user_key()) == 0) {
// Correct user key

View File

@@ -67,16 +67,17 @@ class MemTable {
private:
~MemTable(); // Private since only Unref() should be used to delete it
typedef std::pair<uint64_t, const char*> TableKey;
struct KeyComparator {
const InternalKeyComparator comparator;
explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { }
int operator()(const char* a, const char* b) const;
int operator()(TableKey a, TableKey b) const;
};
friend class MemTableIterator;
friend class MemTableBackwardIterator;
typedef SkipList<const char*, KeyComparator> Table;
typedef SkipList<TableKey, KeyComparator> Table;
KeyComparator comparator_;
int refs_;

View File

@@ -0,0 +1,204 @@
// Copyright (c) 2013 The HyperLevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "replay_iterator.h"
#include "filename.h"
#include "db_impl.h"
#include "dbformat.h"
#include "../hyperleveldb/env.h"
#include "../hyperleveldb/iterator.h"
#include "../port/port.h"
#include "../util/logging.h"
#include "../util/mutexlock.h"
#include "../util/random.h"
namespace hyperleveldb {
ReplayIterator::ReplayIterator() {
}
ReplayIterator::~ReplayIterator() {
}
ReplayState::ReplayState(Iterator* i, SequenceNumber s, SequenceNumber l)
: mem_(NULL),
iter_(i),
seq_start_(s),
seq_limit_(l) {
}
ReplayState::ReplayState(MemTable* m, SequenceNumber s)
: mem_(m),
iter_(NULL),
seq_start_(s),
seq_limit_(0) {
}
ReplayIteratorImpl::ReplayIteratorImpl(DBImpl* db, port::Mutex* mutex, const Comparator* cmp,
Iterator* iter, MemTable* m, SequenceNumber s)
: ReplayIterator(),
db_(db),
mutex_(mutex),
user_comparator_(cmp),
start_at_(s),
valid_(),
status_(),
has_current_user_key_(false),
current_user_key_(),
current_user_sequence_(),
rs_(iter, s, kMaxSequenceNumber),
mems_() {
m->Ref();
mems_.push_back(ReplayState(m, s));
}
ReplayIteratorImpl::~ReplayIteratorImpl() {
}
bool ReplayIteratorImpl::Valid() {
Prime();
return valid_;
}
void ReplayIteratorImpl::Next() {
rs_.iter_->Next();
}
bool ReplayIteratorImpl::HasValue() {
ParsedInternalKey ikey;
return ParseKey(&ikey) && ikey.type == kTypeValue;
}
Slice ReplayIteratorImpl::key() const {
assert(valid_);
return ExtractUserKey(rs_.iter_->key());
}
Slice ReplayIteratorImpl::value() const {
assert(valid_);
return rs_.iter_->value();
}
Status ReplayIteratorImpl::status() const {
if (!status_.ok()) {
return status_;
} else {
return rs_.iter_->status();
}
}
void ReplayIteratorImpl::enqueue(MemTable* m, SequenceNumber s) {
m->Ref();
mems_.push_back(ReplayState(m, s));
}
void ReplayIteratorImpl::cleanup() {
mutex_->Unlock();
if (rs_.iter_) {
delete rs_.iter_;
}
if (rs_.mem_) {
rs_.mem_->Unref();
}
mutex_->Lock();
rs_.iter_ = NULL;
rs_.mem_ = NULL;
while (!mems_.empty()) {
MemTable* mem = mems_.front().mem_;
Iterator* iter = mems_.front().iter_;
mutex_->Unlock();
if (iter) {
delete iter;
}
if (mem) {
mem->Unref();
}
mutex_->Lock();
mems_.pop_front();
}
delete this;
}
bool ReplayIteratorImpl::ParseKey(ParsedInternalKey* ikey) {
return ParseKey(rs_.iter_->key(), ikey);
}
bool ReplayIteratorImpl::ParseKey(const Slice& k, ParsedInternalKey* ikey) {
if (!ParseInternalKey(k, ikey)) {
status_ = Status::Corruption("corrupted internal key in ReplayIteratorImpl");
return false;
} else {
return true;
}
}
void ReplayIteratorImpl::Prime() {
valid_ = false;
if (!status_.ok()) {
return;
}
while (true) {
assert(rs_.iter_);
while (rs_.iter_->Valid()) {
ParsedInternalKey ikey;
if (!ParseKey(rs_.iter_->key(), &ikey)) {
return;
}
// if we can consider this key, and it's recent enough and of the right
// type
if ((!has_current_user_key_ ||
user_comparator_->Compare(ikey.user_key,
Slice(current_user_key_)) != 0 ||
ikey.sequence >= current_user_sequence_) &&
(ikey.sequence >= rs_.seq_start_ &&
(ikey.type == kTypeDeletion || ikey.type == kTypeValue))) {
has_current_user_key_ = true;
current_user_key_.assign(ikey.user_key.data(), ikey.user_key.size());
current_user_sequence_ = ikey.sequence;
valid_ = true;
return;
}
rs_.iter_->Next();
}
if (!rs_.iter_->status().ok()) {
status_ = rs_.iter_->status();
valid_ = false;
return;
}
// we're done with rs_.iter_
has_current_user_key_ = false;
current_user_key_.assign("", 0);
current_user_sequence_ = kMaxSequenceNumber;
delete rs_.iter_;
rs_.iter_ = NULL;
{
MutexLock l(mutex_);
if (mems_.empty() ||
rs_.seq_limit_ < mems_.front().seq_start_) {
rs_.seq_start_ = rs_.seq_limit_;
} else {
if (rs_.mem_) {
rs_.mem_->Unref();
rs_.mem_ = NULL;
}
rs_.mem_ = mems_.front().mem_;
rs_.seq_start_ = mems_.front().seq_start_;
mems_.pop_front();
}
}
rs_.seq_limit_ = db_->LastSequence();
rs_.iter_ = rs_.mem_->NewIterator();
rs_.iter_->SeekToFirst();
assert(rs_.seq_start_ <= rs_.seq_limit_);
if (rs_.seq_start_ == rs_.seq_limit_) {
valid_ = false;
return;
}
}
}
} // namespace leveldb

View File

@@ -0,0 +1,71 @@
// Copyright (c) 2013 The HyperLevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef STORAGE_HYPERLEVELDB_DB_ROLLING_ITERATOR_H_
#define STORAGE_HYPERLEVELDB_DB_ROLLING_ITERATOR_H_
#include <stdint.h>
#include <list>
#include "../hyperleveldb/db.h"
#include "../hyperleveldb/replay_iterator.h"
#include "dbformat.h"
#include "memtable.h"
namespace hyperleveldb {
class DBImpl;
struct ReplayState {
ReplayState(Iterator* i, SequenceNumber s, SequenceNumber l);
ReplayState(MemTable* m, SequenceNumber s);
MemTable* mem_;
Iterator* iter_;
SequenceNumber seq_start_;
SequenceNumber seq_limit_;
};
class ReplayIteratorImpl : public ReplayIterator {
public:
// Refs the memtable on its own; caller must hold mutex while creating this
ReplayIteratorImpl(DBImpl* db, port::Mutex* mutex, const Comparator* cmp,
Iterator* iter, MemTable* m, SequenceNumber s);
virtual bool Valid();
virtual void Next();
virtual bool HasValue();
virtual Slice key() const;
virtual Slice value() const;
virtual Status status() const;
// extra interface
// we ref the memtable; caller holds mutex passed into ctor
// REQUIRES: caller must hold mutex passed into ctor
void enqueue(MemTable* m, SequenceNumber s);
// REQUIRES: caller must hold mutex passed into ctor
void cleanup(); // calls delete this;
private:
virtual ~ReplayIteratorImpl();
bool ParseKey(ParsedInternalKey* ikey);
bool ParseKey(const Slice& k, ParsedInternalKey* ikey);
void Prime();
DBImpl* const db_;
port::Mutex* mutex_;
const Comparator* const user_comparator_;
SequenceNumber const start_at_;
bool valid_;
Status status_;
bool has_current_user_key_;
std::string current_user_key_;
SequenceNumber current_user_sequence_;
ReplayState rs_;
std::list<ReplayState> mems_;
};
} // namespace leveldb
#endif // STORAGE_LEVELDB_DB_ROLLING_ITERATOR_H_

View File

@@ -351,7 +351,7 @@ template<typename Key, class Comparator>
SkipList<Key,Comparator>::SkipList(Comparator cmp, Arena* arena)
: compare_(cmp),
arena_(arena),
head_(NewNode(0 /* any key will do */, kMaxHeight)),
head_(NewNode(Key(), kMaxHeight)),
max_height_(reinterpret_cast<void*>(1)),
rnd_(0xdeadbeef) {
for (int i = 0; i < kMaxHeight; i++) {

View File

@@ -176,30 +176,41 @@ bool SomeFileOverlapsRange(
// is the largest key that occurs in the file, and value() is an
// 16-byte value containing the file number and file size, both
// encoded using EncodeFixed64.
//
// If num != 0, then do not call SeekToLast, Prev
class Version::LevelFileNumIterator : public Iterator {
public:
LevelFileNumIterator(const InternalKeyComparator& icmp,
const std::vector<FileMetaData*>* flist)
const std::vector<FileMetaData*>* flist,
uint64_t num)
: icmp_(icmp),
flist_(flist),
index_(flist->size()) { // Marks as invalid
index_(flist->size()), // Marks as invalid
number_(num) {
}
virtual bool Valid() const {
return index_ < flist_->size();
}
virtual void Seek(const Slice& target) {
index_ = FindFile(icmp_, *flist_, target);
Bump();
}
virtual void SeekToFirst() {
index_ = 0;
Bump();
}
virtual void SeekToFirst() { index_ = 0; }
virtual void SeekToLast() {
assert(number_ == 0);
index_ = flist_->empty() ? 0 : flist_->size() - 1;
}
virtual void Next() {
assert(Valid());
index_++;
Bump();
}
virtual void Prev() {
assert(Valid());
assert(number_ == 0);
if (index_ == 0) {
index_ = flist_->size(); // Marks as invalid
} else {
@@ -218,9 +229,16 @@ class Version::LevelFileNumIterator : public Iterator {
}
virtual Status status() const { return Status::OK(); }
private:
void Bump() {
while (index_ < flist_->size() &&
(*flist_)[index_]->number < number_) {
++index_;
}
}
const InternalKeyComparator icmp_;
const std::vector<FileMetaData*>* const flist_;
uint32_t index_;
uint64_t number_;
// Backing store for value(). Holds the file number and size.
mutable char value_buf_[16];
@@ -241,14 +259,19 @@ static Iterator* GetFileIterator(void* arg,
}
Iterator* Version::NewConcatenatingIterator(const ReadOptions& options,
int level) const {
int level, uint64_t num) const {
return NewTwoLevelIterator(
new LevelFileNumIterator(vset_->icmp_, &files_[level]),
new LevelFileNumIterator(vset_->icmp_, &files_[level], num),
&GetFileIterator, vset_->table_cache_, options);
}
void Version::AddIterators(const ReadOptions& options,
std::vector<Iterator*>* iters) {
return AddSomeIterators(options, 0, iters);
}
void Version::AddSomeIterators(const ReadOptions& options, uint64_t num,
std::vector<Iterator*>* iters) {
// Merge all level zero files together since they may overlap
for (size_t i = 0; i < files_[0].size(); i++) {
iters->push_back(
@@ -261,7 +284,7 @@ void Version::AddIterators(const ReadOptions& options,
// lazily.
for (int level = 1; level < config::kNumLevels; level++) {
if (!files_[level].empty()) {
iters->push_back(NewConcatenatingIterator(options, level));
iters->push_back(NewConcatenatingIterator(options, level, num));
}
}
}
@@ -300,6 +323,51 @@ static bool NewestFirst(FileMetaData* a, FileMetaData* b) {
return a->number > b->number;
}
void Version::ForEachOverlapping(Slice user_key, Slice internal_key,
void* arg,
bool (*func)(void*, int, FileMetaData*)) {
// TODO(sanjay): Change Version::Get() to use this function.
const Comparator* ucmp = vset_->icmp_.user_comparator();
// Search level-0 in order from newest to oldest.
std::vector<FileMetaData*> tmp;
tmp.reserve(files_[0].size());
for (uint32_t i = 0; i < files_[0].size(); i++) {
FileMetaData* f = files_[0][i];
if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
tmp.push_back(f);
}
}
if (!tmp.empty()) {
std::sort(tmp.begin(), tmp.end(), NewestFirst);
for (uint32_t i = 0; i < tmp.size(); i++) {
if (!(*func)(arg, 0, tmp[i])) {
return;
}
}
}
// Search other levels.
for (int level = 1; level < config::kNumLevels; level++) {
size_t num_files = files_[level].size();
if (num_files == 0) continue;
// Binary search to find earliest index whose largest key >= internal_key.
uint32_t index = FindFile(vset_->icmp_, files_[level], internal_key);
if (index < num_files) {
FileMetaData* f = files_[level][index];
if (ucmp->Compare(user_key, f->smallest.user_key()) < 0) {
// All of "f" is past any data for user_key
} else {
if (!(*func)(arg, level, f)) {
return;
}
}
}
}
}
Status Version::Get(const ReadOptions& options,
const LookupKey& k,
std::string* value,
@@ -399,6 +467,57 @@ Status Version::Get(const ReadOptions& options,
return Status::NotFound(Slice()); // Use an empty error message for speed
}
bool Version::UpdateStats(const GetStats& stats) {
FileMetaData* f = stats.seek_file;
if (f != NULL) {
f->allowed_seeks--;
if (f->allowed_seeks <= 0 && file_to_compact_ == NULL) {
file_to_compact_ = f;
file_to_compact_level_ = stats.seek_file_level;
return true;
}
}
return false;
}
bool Version::RecordReadSample(Slice internal_key) {
ParsedInternalKey ikey;
if (!ParseInternalKey(internal_key, &ikey)) {
return false;
}
struct State {
GetStats stats; // Holds first matching file
int matches;
static bool Match(void* arg, int level, FileMetaData* f) {
State* state = reinterpret_cast<State*>(arg);
state->matches++;
if (state->matches == 1) {
// Remember first match.
state->stats.seek_file = f;
state->stats.seek_file_level = level;
}
// We can stop iterating once we have a second match.
return state->matches < 2;
}
};
State state;
state.matches = 0;
ForEachOverlapping(ikey.user_key, internal_key, &state, &State::Match);
// Must have at least two matches since we want to merge across
// files. But what if we have a single file that contains many
// overwrites and deletions? Should we have another mechanism for
// finding such files?
if (state.matches >= 2) {
// 1MB cost is about 1 seek (see comment in Builder::Apply).
return UpdateStats(state.stats);
}
return false;
}
void Version::Ref() {
++refs_;
}
@@ -447,6 +566,8 @@ void Version::GetOverlappingInputs(
const InternalKey* begin,
const InternalKey* end,
std::vector<FileMetaData*>* inputs) {
assert(level >= 0);
assert(level < config::kNumLevels);
inputs->clear();
Slice user_begin, user_end;
if (begin != NULL) {
@@ -1180,7 +1301,7 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) {
} else {
// Create concatenating iterator for the files from this level
list[num++] = NewTwoLevelIterator(
new Version::LevelFileNumIterator(icmp_, &c->inputs_[which]),
new Version::LevelFileNumIterator(icmp_, &c->inputs_[which], 0),
&GetFileIterator, table_cache_, options);
}
}
@@ -1199,20 +1320,21 @@ struct CompactionBoundary {
};
struct CmpByRange {
CmpByRange(const Comparator* cmp) : cmp_(cmp) {}
CmpByRange(const InternalKeyComparator* cmp) : cmp_(cmp) {}
bool operator () (const FileMetaData* lhs, const FileMetaData* rhs) {
int smallest = cmp_->Compare(lhs->smallest.user_key(), rhs->smallest.user_key());
int smallest = cmp_->Compare(lhs->smallest, rhs->smallest);
if (smallest == 0) {
return cmp_->Compare(lhs->largest.user_key(), rhs->largest.user_key()) < 0;
return cmp_->Compare(lhs->largest, rhs->largest) < 0;
}
return smallest < 0;
}
private:
const Comparator* cmp_;
const InternalKeyComparator* cmp_;
};
// Stores the compaction boundaries between level and level + 1
void VersionSet::GetCompactionBoundaries(int level,
void VersionSet::GetCompactionBoundaries(Version* v,
int level,
std::vector<FileMetaData*>* LA,
std::vector<FileMetaData*>* LB,
std::vector<uint64_t>* LA_sizes,
@@ -1220,12 +1342,12 @@ void VersionSet::GetCompactionBoundaries(int level,
std::vector<CompactionBoundary>* boundaries)
{
const Comparator* user_cmp = icmp_.user_comparator();
*LA = current_->files_[level + 0];
*LB = current_->files_[level + 1];
*LA = v->files_[level + 0];
*LB = v->files_[level + 1];
*LA_sizes = std::vector<uint64_t>(LA->size() + 1, 0);
*LB_sizes = std::vector<uint64_t>(LB->size() + 1, 0);
std::sort(LA->begin(), LA->end(), CmpByRange(user_cmp));
std::sort(LB->begin(), LB->end(), CmpByRange(user_cmp));
std::sort(LA->begin(), LA->end(), CmpByRange(&icmp_));
std::sort(LB->begin(), LB->end(), CmpByRange(&icmp_));
boundaries->resize(LA->size());
// compute sizes
@@ -1259,7 +1381,7 @@ void VersionSet::GetCompactionBoundaries(int level,
}
}
int VersionSet::PickCompactionLevel(bool* locked) {
int VersionSet::PickCompactionLevel(bool* locked, bool seek_driven) const {
// Find an unlocked level has score >= 1 where level + 1 has score < 1.
int level = config::kNumLevels;
for (int i = 0; i + 1 < config::kNumLevels; ++i) {
@@ -1267,11 +1389,19 @@ int VersionSet::PickCompactionLevel(bool* locked) {
continue;
}
if (current_->compaction_scores_[i + 0] >= 1.0 &&
current_->compaction_scores_[i + 1] < 1.0) {
(i + 2 >= config::kNumLevels ||
current_->compaction_scores_[i + 1] < 1.0)) {
level = i;
break;
}
}
if (seek_driven &&
level == config::kNumLevels &&
current_->file_to_compact_ != NULL &&
!locked[current_->file_to_compact_level_ + 0] &&
!locked[current_->file_to_compact_level_ + 1]) {
level = current_->file_to_compact_level_;
}
return level;
}
@@ -1279,16 +1409,16 @@ static bool OldestFirst(FileMetaData* a, FileMetaData* b) {
return a->number < b->number;
}
Compaction* VersionSet::PickCompaction(int level) {
Compaction* VersionSet::PickCompaction(Version* v, int level) {
assert(0 <= level && level < config::kNumLevels);
bool trivial = false;
if (current_->files_[level].empty()) {
if (v->files_[level].empty()) {
return NULL;
}
Compaction* c = new Compaction(level);
c->input_version_ = current_;
c->input_version_ = v;
c->input_version_->Ref();
if (level > 0) {
@@ -1297,7 +1427,7 @@ Compaction* VersionSet::PickCompaction(int level) {
std::vector<uint64_t> LA_sizes;
std::vector<uint64_t> LB_sizes;
std::vector<CompactionBoundary> boundaries;
GetCompactionBoundaries(level, &LA, &LB, &LA_sizes, &LB_sizes, &boundaries);
GetCompactionBoundaries(v, level, &LA, &LB, &LA_sizes, &LB_sizes, &boundaries);
// find the best set of files: maximize the ratio of sizeof(LA)/sizeof(LB)
// while keeping sizeof(LA)+sizeof(LB) < some threshold. If there's a tie
@@ -1358,6 +1488,9 @@ Compaction* VersionSet::PickCompaction(int level) {
c->inputs_[1].push_back(LB[i]);
}
c->SetRatio(best_ratio);
// pick the file to compact in this level
} else if (v->file_to_compact_ != NULL) {
c->inputs_[0].push_back(v->file_to_compact_);
// otherwise just pick the file with least overlap
} else {
assert(level >= 0);
@@ -1379,15 +1512,13 @@ Compaction* VersionSet::PickCompaction(int level) {
}
}
} else {
std::vector<FileMetaData*> tmp(current_->files_[0]);
std::vector<FileMetaData*> tmp(v->files_[0]);
std::sort(tmp.begin(), tmp.end(), OldestFirst);
for (size_t i = 0; i < tmp.size() && c->inputs_[0].size() < 32; ++i) {
c->inputs_[0].push_back(tmp[i]);
}
}
assert(!c->inputs_[0].empty());
if (!trivial) {
SetupOtherInputs(c);
}
@@ -1398,13 +1529,13 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
const int level = c->level();
InternalKey smallest, largest;
GetRange(c->inputs_[0], &smallest, &largest);
current_->GetOverlappingInputs(level+1, &smallest, &largest, &c->inputs_[1]);
c->input_version_->GetOverlappingInputs(level+1, &smallest, &largest, &c->inputs_[1]);
// Update the place where we will do the next compaction for this level.
// We update this immediately instead of waiting for the VersionEdit
// to be applied so that if the compaction fails, we will try a different
// key range next time.
compact_pointer_[level] = largest.Encode().ToString();
//compact_pointer_[level] = largest.Encode().ToString();
c->edit_.SetCompactPointer(level, largest);
}

View File

@@ -64,6 +64,12 @@ class Version {
// REQUIRES: This version has been saved (see VersionSet::SaveTo)
void AddIterators(const ReadOptions&, std::vector<Iterator*>* iters);
// Append to *iters a sequence of iterators that will
// yield a subset of the contents of this Version when merged together.
// Yields only files with number greater or equal to num
// REQUIRES: This version has been saved (see VersionSet::SaveTo)
void AddSomeIterators(const ReadOptions&, uint64_t num, std::vector<Iterator*>* iters);
// Lookup the value for key. If found, store it in *val and
// return OK. Else return a non-OK status. Fills *stats.
// REQUIRES: lock is not held
@@ -74,6 +80,17 @@ class Version {
Status Get(const ReadOptions&, const LookupKey& key, std::string* val,
GetStats* stats);
// Adds "stats" into the current state. Returns true if a new
// compaction may need to be triggered, false otherwise.
// REQUIRES: lock is held
bool UpdateStats(const GetStats& stats);
// Record a sample of bytes read at the specified internal key.
// Samples are taken approximately once every config::kReadBytesPeriod
// bytes. Returns true if a new compaction may need to be triggered.
// REQUIRES: lock is held
bool RecordReadSample(Slice key);
// Reference count management (so Versions do not disappear out from
// under live iterators)
void Ref();
@@ -108,7 +125,16 @@ class Version {
friend class VersionSet;
class LevelFileNumIterator;
Iterator* NewConcatenatingIterator(const ReadOptions&, int level) const;
Iterator* NewConcatenatingIterator(const ReadOptions&, int level, uint64_t num) const;
// Call func(arg, level, f) for every file that overlaps user_key in
// order from newest to oldest. If an invocation of func returns
// false, makes no more calls.
//
// REQUIRES: user portion of internal_key == user_key.
void ForEachOverlapping(Slice user_key, Slice internal_key,
void* arg,
bool (*func)(void*, int, FileMetaData*));
VersionSet* vset_; // VersionSet to which this Version belongs
Version* next_; // Next version in linked list
@@ -118,13 +144,19 @@ class Version {
// List of files per level
std::vector<FileMetaData*> files_[config::kNumLevels];
// Next file to compact based on seek stats.
FileMetaData* file_to_compact_;
int file_to_compact_level_;
// Level that should be compacted next and its compaction score.
// Score < 1 means compaction is not strictly needed. These fields
// are initialized by Finalize().
double compaction_scores_[config::kNumLevels];
explicit Version(VersionSet* vset)
: vset_(vset), next_(this), prev_(this), refs_(0) {
: vset_(vset), next_(this), prev_(this), refs_(0),
file_to_compact_(NULL),
file_to_compact_level_(-1) {
for (int i = 0; i < config::kNumLevels; ++i) {
compaction_scores_[i] = -1;
}
@@ -202,13 +234,13 @@ class VersionSet {
// Pick level for a new compaction.
// Returns kNumLevels if there is no compaction to be done.
// Otherwise returns the lowest unlocked level that may compact upwards.
int PickCompactionLevel(bool* locked);
int PickCompactionLevel(bool* locked, bool seek_driven) const;
// Pick inputs for a new compaction at the specified level.
// Returns NULL if there is no compaction to be done.
// Otherwise returns a pointer to a heap-allocated object that
// describes the compaction. Caller should delete the result.
Compaction* PickCompaction(int level);
Compaction* PickCompaction(Version* v, int level);
// Return a compaction object for compacting the range [begin,end] in
// the specified level. Returns NULL if there is nothing in that
@@ -228,17 +260,8 @@ class VersionSet {
Iterator* MakeInputIterator(Compaction* c);
// Returns true iff some level needs a compaction.
bool NeedsCompaction(bool* levels) const {
Version* v = current_;
for (int i = 0; i + 1 < config::kNumLevels; ++i) {
if (!levels[i] && !levels[i + 1] &&
v->compaction_scores_[i] >= 1.0 &&
(i + 2 == config::kNumLevels ||
v->compaction_scores_[i + 1] < 1.0)) {
return true;
}
}
return false;
bool NeedsCompaction(bool* levels, bool seek_driven) const {
return PickCompactionLevel(levels, seek_driven) != config::kNumLevels;
}
// Add all files listed in any live version to *live.
@@ -273,7 +296,8 @@ class VersionSet {
InternalKey* smallest,
InternalKey* largest);
void GetCompactionBoundaries(int level,
void GetCompactionBoundaries(Version* version,
int level,
std::vector<FileMetaData*>* LA,
std::vector<FileMetaData*>* LB,
std::vector<uint64_t>* LA_sizes,

View File

@@ -0,0 +1,41 @@
package hyperleveldb
| source="hyperleveldb"
| debian name="libhyperleveldb0"
| version="@VERSION@"
| release="1"
| license="BSD"
| copyright="2011-2013 The LevelDB Authors"
| homepage="http://hyperdex.org"
| tarball="http://hyperdex.org/src/hyperleveldb-{version}.tar.gz"
| debian section="libs"
| configure="--disable-static"
| summary="A fast key-value storage library"
+ {libdir}/libhyperleveldb.so.0
+ {libdir}/libhyperleveldb.so.0.0.0
'''LevelDB is a fast key-value storage library written at Google that provides
an ordered mapping from string keys to string values.'''
subpackage hyperleveldb-devel
| debian name="libhyperleveldb-dev"
| debian section="libdevel"
| debian requires="libhyperleveldb0{self}"
| fedora requires="hyperleveldb{self}"
| summary="A fast key-value storage library (development files)"
+ {includedir}/hyperleveldb/cache.h
+ {includedir}/hyperleveldb/c.h
+ {includedir}/hyperleveldb/comparator.h
+ {includedir}/hyperleveldb/db.h
+ {includedir}/hyperleveldb/env.h
+ {includedir}/hyperleveldb/filter_policy.h
+ {includedir}/hyperleveldb/iterator.h
+ {includedir}/hyperleveldb/options.h
+ {includedir}/hyperleveldb/replay_iterator.h
+ {includedir}/hyperleveldb/slice.h
+ {includedir}/hyperleveldb/status.h
+ {includedir}/hyperleveldb/table_builder.h
+ {includedir}/hyperleveldb/table.h
+ {includedir}/hyperleveldb/write_batch.h
+ {libdir}/libhyperleveldb.so
+ {libdir}/pkgconfig/libhyperleveldb.pc
exclude {libdir}/libhyperleveldb.la

View File

@@ -5,6 +5,7 @@
#ifndef STORAGE_HYPERLEVELDB_INCLUDE_COMPARATOR_H_
#define STORAGE_HYPERLEVELDB_INCLUDE_COMPARATOR_H_
#include <stdint.h>
#include <string>
namespace hyperleveldb {
@@ -51,6 +52,9 @@ class Comparator {
// Simple comparator implementations may return with *key unchanged,
// i.e., an implementation of this method that does nothing is correct.
virtual void FindShortSuccessor(std::string* key) const = 0;
// If unsure, return 0;
virtual uint64_t KeyNum(const Slice& key) const;
};
// Return a builtin comparator that uses lexicographic byte-wise

View File

@@ -7,14 +7,16 @@
#include <stdint.h>
#include <stdio.h>
#include "iterator.h"
#include "options.h"
#include "replay_iterator.h"
namespace hyperleveldb {
// Update Makefile if you change these
static const int kMajorVersion = 1;
static const int kMinorVersion = 11;
static const int kMinorVersion = 13;
struct Options;
struct ReadOptions;
@@ -140,6 +142,36 @@ class DB {
// db->CompactRange(NULL, NULL);
virtual void CompactRange(const Slice* begin, const Slice* end) = 0;
// Create a live backup of a live LevelDB instance.
// The backup is stored in a directory named "backup-<name>" under the top
// level of the open LevelDB database. The implementation is permitted, and
// even encouraged, to improve the performance of this call through
// hard-links.
virtual Status LiveBackup(const Slice& name) = 0;
// Return an opaque timestamp that identifies the current point in time of the
// database. This timestamp may be subsequently presented to the
// NewReplayIterator method to create a ReplayIterator.
virtual void GetReplayTimestamp(std::string* timestamp) = 0;
// Set the lower bound for manual garbage collection. This method only takes
// effect when Options.manual_garbage_collection is true.
virtual void AllowGarbageCollectBeforeTimestamp(const std::string& timestamp) = 0;
// Validate the timestamp
virtual bool ValidateTimestamp(const std::string& timestamp) = 0;
// Compare two timestamps and return -1, 0, 1 for lt, eq, gt
virtual int CompareTimestamps(const std::string& lhs, const std::string& rhs) = 0;
// Return a ReplayIterator that returns every write operation performed after
// the timestamp.
virtual Status GetReplayIterator(const std::string& timestamp,
ReplayIterator** iter) = 0;
// Release a previously allocated replay iterator.
virtual void ReleaseReplayIterator(ReplayIterator* iter) = 0;
private:
// No copying allowed
DB(const DB&);

View File

@@ -94,6 +94,15 @@ class Env {
virtual Status RenameFile(const std::string& src,
const std::string& target) = 0;
// Copy file src to target.
virtual Status CopyFile(const std::string& src,
const std::string& target) = 0;
// Link file src to target.
virtual Status LinkFile(const std::string& src,
const std::string& target) = 0;
// Lock the specified file. Used to prevent concurrent access to
// the same db by multiple processes. On failure, stores NULL in
// *lock and returns non-OK.
@@ -306,6 +315,12 @@ class EnvWrapper : public Env {
Status RenameFile(const std::string& s, const std::string& t) {
return target_->RenameFile(s, t);
}
Status CopyFile(const std::string& s, const std::string& t) {
return target_->CopyFile(s, t);
}
Status LinkFile(const std::string& s, const std::string& t) {
return target_->LinkFile(s, t);
}
Status LockFile(const std::string& f, FileLock** l) {
return target_->LockFile(f, l);
}

View File

@@ -135,6 +135,17 @@ struct Options {
// Default: NULL
const FilterPolicy* filter_policy;
// Is the database used with the Replay mechanism? If yes, the lower bound on
// values to compact is (somewhat) left up to the application; if no, then
// LevelDB functions as usual, and uses snapshots to determine the lower
// bound. HyperLevelDB will always maintain the integrity of snapshots, so
// the application merely has the option to hold data as if it's holding a
// snapshot. This just prevents compaction from grabbing data before the app
// can get a snapshot.
//
// Default: false/no.
bool manual_garbage_collection;
// Create an Options object with default values for all fields.
Options();
};

View File

@@ -0,0 +1,57 @@
// Copyright (c) 2013 The HyperLevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef STORAGE_HYPERLEVELDB_INCLUDE_REPLAY_ITERATOR_H_
#define STORAGE_HYPERLEVELDB_INCLUDE_REPLAY_ITERATOR_H_
#include "slice.h"
#include "status.h"
namespace hyperleveldb {
class ReplayIterator {
public:
ReplayIterator();
// An iterator is either positioned at a deleted key, present key/value pair,
// or not valid. This method returns true iff the iterator is valid.
virtual bool Valid() = 0;
// Moves to the next entry in the source. After this call, Valid() is
// true iff the iterator was not positioned at the last entry in the source.
// REQUIRES: Valid()
virtual void Next() = 0;
// Return true if the current entry points to a key-value pair. If this
// returns false, it means the current entry is a deleted entry.
virtual bool HasValue() = 0;
// Return the key for the current entry. The underlying storage for
// the returned slice is valid only until the next modification of
// the iterator.
// REQUIRES: Valid()
virtual Slice key() const = 0;
// Return the value for the current entry. The underlying storage for
// the returned slice is valid only until the next modification of
// the iterator.
// REQUIRES: !AtEnd() && !AtStart()
virtual Slice value() const = 0;
// If an error has occurred, return it. Else return an ok status.
virtual Status status() const = 0;
protected:
// must be released by giving it back to the DB
virtual ~ReplayIterator();
private:
// No copying allowed
ReplayIterator(const ReplayIterator&);
void operator=(const ReplayIterator&);
};
} // namespace leveldb
#endif // STORAGE_LEVELDB_INCLUDE_REPLAY_ITERATOR_H_

View File

@@ -1,3 +1,7 @@
// Copyright (c) 2013 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
// Test for issue 178: a manual compaction causes deleted data to reappear.
#include <iostream>
#include <sstream>

View File

@@ -0,0 +1,70 @@
// Test for issue 200: iterator reversal brings in keys newer than snapshots
#include <iostream>
#include <sstream>
#include <cstdlib>
#include "hyperleveldb/db.h"
#include "hyperleveldb/write_batch.h"
#include "util/testharness.h"
namespace {
class Issue200 { };
TEST(Issue200, Test) {
// Get rid of any state from an old run.
std::string dbpath = leveldb::test::TmpDir() + "/leveldb_200_iterator_test";
DestroyDB(dbpath, leveldb::Options());
// Open database.
leveldb::DB* db;
leveldb::Options db_options;
db_options.create_if_missing = true;
db_options.compression = leveldb::kNoCompression;
ASSERT_OK(leveldb::DB::Open(db_options, dbpath, &db));
leveldb::WriteOptions write_options;
db->Put(write_options, leveldb::Slice("1", 1), leveldb::Slice("b", 1));
db->Put(write_options, leveldb::Slice("2", 1), leveldb::Slice("c", 1));
db->Put(write_options, leveldb::Slice("3", 1), leveldb::Slice("d", 1));
db->Put(write_options, leveldb::Slice("4", 1), leveldb::Slice("e", 1));
db->Put(write_options, leveldb::Slice("5", 1), leveldb::Slice("f", 1));
const leveldb::Snapshot *snapshot = db->GetSnapshot();
leveldb::ReadOptions read_options;
read_options.snapshot = snapshot;
leveldb::Iterator *iter = db->NewIterator(read_options);
// Commenting out this Put changes the iterator behavior,
// gives the expected behavior. This is unexpected because
// the iterator is taken on a snapshot that was taken
// before the Put
db->Put(write_options, leveldb::Slice("25", 2), leveldb::Slice("cd", 2));
iter->Seek(leveldb::Slice("5", 1));
ASSERT_EQ("5", iter->key().ToString());
iter->Prev();
ASSERT_EQ("4", iter->key().ToString());
iter->Prev();
ASSERT_EQ("3", iter->key().ToString());
// At this point the iterator is at "3", I expect the Next() call will
// move it to "4". But it stays on "3"
iter->Next();
ASSERT_EQ("4", iter->key().ToString());
iter->Next();
ASSERT_EQ("5", iter->key().ToString());
// close database
delete iter;
db->ReleaseSnapshot(snapshot);
delete db;
DestroyDB(dbpath, leveldb::Options());
}
} // anonymous namespace
int main(int argc, char** argv) {
return leveldb::test::RunAllTests();
}

View File

@@ -4,15 +4,21 @@
#include <algorithm>
#include <stdint.h>
#include "../hyperleveldb/comparator.h"
#include "../hyperleveldb/slice.h"
#include "../port/port.h"
#include "coding.h"
#include "logging.h"
namespace hyperleveldb {
Comparator::~Comparator() { }
uint64_t Comparator::KeyNum(const Slice& key) const {
return 0;
}
namespace {
class BytewiseComparatorImpl : public Comparator {
public:
@@ -63,6 +69,22 @@ class BytewiseComparatorImpl : public Comparator {
}
// *key is a run of 0xffs. Leave it alone.
}
virtual uint64_t KeyNum(const Slice& key) const {
unsigned char buf[sizeof(uint64_t)];
memset(buf, 0, sizeof(buf));
memmove(buf, key.data(), std::min(key.size(), sizeof(uint64_t)));
uint64_t number;
number = static_cast<uint64_t>(buf[0]) << 56
| static_cast<uint64_t>(buf[1]) << 48
| static_cast<uint64_t>(buf[2]) << 40
| static_cast<uint64_t>(buf[3]) << 32
| static_cast<uint64_t>(buf[4]) << 24
| static_cast<uint64_t>(buf[5]) << 16
| static_cast<uint64_t>(buf[6]) << 8
| static_cast<uint64_t>(buf[7]);
return number;
}
};
} // namespace

View File

@@ -324,10 +324,42 @@ class PosixMmapFile : public WritableFile {
return s;
}
virtual Status Sync() {
Status SyncDirIfManifest() {
const char* f = filename_.c_str();
const char* sep = strrchr(f, '/');
Slice basename;
std::string dir;
if (sep == NULL) {
dir = ".";
basename = f;
} else {
dir = std::string(f, sep - f);
basename = sep + 1;
}
Status s;
if (basename.starts_with("MANIFEST")) {
int fd = open(dir.c_str(), O_RDONLY);
if (fd < 0) {
s = IOError(dir, errno);
} else {
if (fsync(fd) < 0) {
s = IOError(dir, errno);
}
close(fd);
}
}
return s;
}
virtual Status Sync() {
// Ensure new files referred to by the manifest are in the filesystem.
Status s = SyncDirIfManifest();
bool need_sync = false;
if (!s.ok()) {
return s;
}
{
MutexLock l(&mtx_);
need_sync = sync_offset_ != end_offset_;
@@ -504,6 +536,54 @@ class PosixEnv : public Env {
return result;
}
virtual Status CopyFile(const std::string& src, const std::string& target) {
Status result;
int fd1;
int fd2;
if (result.ok() && (fd1 = open(src.c_str(), O_RDONLY)) < 0) {
result = IOError(src, errno);
}
if (result.ok() && (fd2 = open(target.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644)) < 0) {
result = IOError(target, errno);
}
ssize_t amt = 0;
char buf[512];
while (result.ok() && (amt = read(fd1, buf, 512)) > 0) {
if (write(fd2, buf, amt) != amt) {
result = IOError(src, errno);
}
}
if (result.ok() && amt < 0) {
result = IOError(src, errno);
}
if (fd1 >= 0 && close(fd1) < 0) {
if (result.ok()) {
result = IOError(src, errno);
}
}
if (fd2 >= 0 && close(fd2) < 0) {
if (result.ok()) {
result = IOError(target, errno);
}
}
return result;
}
virtual Status LinkFile(const std::string& src, const std::string& target) {
Status result;
if (link(src.c_str(), target.c_str()) != 0) {
result = IOError(src, errno);
}
return result;
}
virtual Status LockFile(const std::string& fname, FileLock** lock) {
*lock = NULL;
Status result;

View File

@@ -22,7 +22,8 @@ Options::Options()
block_size(4096),
block_restart_interval(16),
compression(kSnappyCompression),
filter_policy(NULL) {
filter_policy(NULL),
manual_garbage_collection(false) {
}

View File

@@ -16,7 +16,12 @@ class Random {
private:
uint32_t seed_;
public:
explicit Random(uint32_t s) : seed_(s & 0x7fffffffu) { }
explicit Random(uint32_t s) : seed_(s & 0x7fffffffu) {
// Avoid bad seeds.
if (seed_ == 0 || seed_ == 2147483647L) {
seed_ = 1;
}
}
uint32_t Next() {
static const uint32_t M = 2147483647L; // 2^31-1
static const uint64_t A = 16807; // bits 14, 8, 7, 5, 2, 1, 0