Upgrade LevelDB to 1.10.0, mostly for better write stall logging.

This commit is contained in:
JoelKatz
2013-05-15 17:32:44 -07:00
parent b00ac64941
commit afbaa46924
10 changed files with 95 additions and 21 deletions

View File

@@ -310,16 +310,24 @@ Status DBImpl::Recover(VersionEdit* edit) {
if (!s.ok()) {
return s;
}
std::set<uint64_t> expected;
versions_->AddLiveFiles(&expected);
uint64_t number;
FileType type;
std::vector<uint64_t> logs;
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type)
&& type == kLogFile
&& ((number >= min_log) || (number == prev_log))) {
if (ParseFileName(filenames[i], &number, &type)) {
expected.erase(number);
if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
logs.push_back(number);
}
}
if (!expected.empty()) {
char buf[50];
snprintf(buf, sizeof(buf), "%d missing files; e.g.",
static_cast<int>(expected.size()));
return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
}
// Recover in the order in which the logs were generated
std::sort(logs.begin(), logs.end());
@@ -1268,10 +1276,11 @@ Status DBImpl::MakeRoomForWrite(bool force) {
} else if (imm_ != NULL) {
// We have filled up the current memtable, but the previous
// one is still being compacted, so we wait.
Log(options_.info_log, "Current memtable full; waiting...\n");
bg_cv_.Wait();
} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
// There are too many level-0 files.
Log(options_.info_log, "waiting...\n");
Log(options_.info_log, "Too many L0 files; waiting...\n");
bg_cv_.Wait();
} else {
// Attempt to switch to a new memtable and trigger compaction of old

View File

@@ -461,6 +461,20 @@ class DBTest {
}
return result;
}
bool DeleteAnSSTFile() {
std::vector<std::string> filenames;
ASSERT_OK(env_->GetChildren(dbname_, &filenames));
uint64_t number;
FileType type;
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type) && type == kTableFile) {
ASSERT_OK(env_->DeleteFile(TableFileName(dbname_, number)));
return true;
}
}
return false;
}
};
TEST(DBTest, Empty) {
@@ -1567,6 +1581,23 @@ TEST(DBTest, ManifestWriteError) {
}
}
TEST(DBTest, MissingSSTFile) {
ASSERT_OK(Put("foo", "bar"));
ASSERT_EQ("bar", Get("foo"));
// Dump the memtable to disk.
dbfull()->TEST_CompactMemTable();
ASSERT_EQ("bar", Get("foo"));
ASSERT_TRUE(DeleteAnSSTFile());
Options options = CurrentOptions();
options.paranoid_checks = true;
Status s = TryReopen(&options);
ASSERT_TRUE(!s.ok());
ASSERT_TRUE(s.ToString().find("issing") != std::string::npos)
<< s.ToString();
}
TEST(DBTest, FilesDeletedAfterCompaction) {
ASSERT_OK(Put("foo", "v2"));
Compact("a", "z");

View File

@@ -26,7 +26,7 @@ std::string ParsedInternalKey::DebugString() const {
(unsigned long long) sequence,
int(type));
std::string result = "'";
result += user_key.ToString();
result += EscapeString(user_key.ToString());
result += buf;
return result;
}

View File

@@ -14,7 +14,7 @@ namespace leveldb {
// Update Makefile if you change these
static const int kMajorVersion = 1;
static const int kMinorVersion = 9;
static const int kMinorVersion = 10;
struct Options;
struct ReadOptions;

View File

@@ -16,7 +16,7 @@
namespace leveldb {
inline uint32_t Block::NumRestarts() const {
assert(size_ >= 2*sizeof(uint32_t));
assert(size_ >= sizeof(uint32_t));
return DecodeFixed32(data_ + size_ - sizeof(uint32_t));
}
@@ -27,11 +27,12 @@ Block::Block(const BlockContents& contents)
if (size_ < sizeof(uint32_t)) {
size_ = 0; // Error marker
} else {
restart_offset_ = size_ - (1 + NumRestarts()) * sizeof(uint32_t);
if (restart_offset_ > size_ - sizeof(uint32_t)) {
// The size is too small for NumRestarts() and therefore
// restart_offset_ wrapped around.
size_t max_restarts_allowed = (size_-sizeof(uint32_t)) / sizeof(uint32_t);
if (NumRestarts() > max_restarts_allowed) {
// The size is too small for NumRestarts()
size_ = 0;
} else {
restart_offset_ = size_ - (1 + NumRestarts()) * sizeof(uint32_t);
}
}
}
@@ -253,7 +254,7 @@ class Block::Iter : public Iterator {
};
Iterator* Block::NewIterator(const Comparator* cmp) {
if (size_ < 2*sizeof(uint32_t)) {
if (size_ < sizeof(uint32_t)) {
return NewErrorIterator(Status::Corruption("bad block contents"));
}
const uint32_t num_restarts = NumRestarts();

View File

@@ -228,7 +228,6 @@ Status Table::InternalGet(const ReadOptions& options, const Slice& k,
!filter->KeyMayMatch(handle.offset(), k)) {
// Not found
} else {
Slice handle = iiter->value();
Iterator* block_iter = BlockReader(this, options, iiter->value());
block_iter->Seek(k);
if (block_iter->Valid()) {

View File

@@ -644,6 +644,36 @@ class Harness {
Constructor* constructor_;
};
// Test empty table/block.
TEST(Harness, Empty) {
for (int i = 0; i < kNumTestArgs; i++) {
Init(kTestArgList[i]);
Random rnd(test::RandomSeed() + 1);
Test(&rnd);
}
}
// Special test for a block with no restart entries. The C++ leveldb
// code never generates such blocks, but the Java version of leveldb
// seems to.
TEST(Harness, ZeroRestartPointsInBlock) {
char data[sizeof(uint32_t)];
memset(data, 0, sizeof(data));
BlockContents contents;
contents.data = Slice(data, sizeof(data));
contents.cachable = false;
contents.heap_allocated = false;
Block block(contents);
Iterator* iter = block.NewIterator(BytewiseComparator());
iter->SeekToFirst();
ASSERT_TRUE(!iter->Valid());
iter->SeekToLast();
ASSERT_TRUE(!iter->Valid());
iter->Seek("foo");
ASSERT_TRUE(!iter->Valid());
delete iter;
}
// Test the empty key
TEST(Harness, SimpleEmptyKey) {
for (int i = 0; i < kNumTestArgs; i++) {

View File

@@ -116,7 +116,6 @@ class HandleTable {
LRUHandle* h = list_[i];
while (h != NULL) {
LRUHandle* next = h->next_hash;
Slice key = h->key();
uint32_t hash = h->hash;
LRUHandle** ptr = &new_list[hash & (new_length - 1)];
h->next_hash = *ptr;
@@ -160,7 +159,6 @@ class LRUCache {
// mutex_ protects the following state.
port::Mutex mutex_;
size_t usage_;
uint64_t last_id_;
// Dummy head of LRU list.
// lru.prev is newest entry, lru.next is oldest entry.
@@ -170,8 +168,7 @@ class LRUCache {
};
LRUCache::LRUCache()
: usage_(0),
last_id_(0) {
: usage_(0) {
// Make empty circular linked list
lru_.next = &lru_;
lru_.prev = &lru_;

View File

@@ -386,7 +386,7 @@ class PosixEnv : public Env {
PosixEnv();
virtual ~PosixEnv() {
fprintf(stderr, "Destroying Env::Default()\n");
exit(1);
abort();
}
virtual Status NewSequentialFile(const std::string& fname,
@@ -589,7 +589,7 @@ class PosixEnv : public Env {
void PthreadCall(const char* label, int result) {
if (result != 0) {
fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
exit(1);
abort();
}
}

View File

@@ -6,6 +6,13 @@
#include "util/coding.h"
#include "util/hash.h"
// The FALLTHROUGH_INTENDED macro can be used to annotate implicit fall-through
// between switch labels. The real definition should be provided externally.
// This one is a fallback version for unsupported compilers.
#ifndef FALLTHROUGH_INTENDED
#define FALLTHROUGH_INTENDED do { } while (0)
#endif
namespace leveldb {
uint32_t Hash(const char* data, size_t n, uint32_t seed) {
@@ -28,10 +35,10 @@ uint32_t Hash(const char* data, size_t n, uint32_t seed) {
switch (limit - data) {
case 3:
h += data[2] << 16;
// fall through
FALLTHROUGH_INTENDED;
case 2:
h += data[1] << 8;
// fall through
FALLTHROUGH_INTENDED;
case 1:
h += data[0];
h *= m;