mirror of
https://github.com/XRPLF/rippled.git
synced 2026-06-07 10:47:05 +00:00
Very basic Multiget and simple test cases.
Summary: Implemented the MultiGet operator which takes in a list of keys and returns their associated values. Currently uses std::vector as its container data structure. Otherwise, it works identically to "Get". Test Plan: 1. make db_test ; compile it 2. ./db_test ; test it 3. make all check ; regress / run all tests 4. make release ; (optional) compile with release settings Reviewers: haobo, MarkCallaghan, dhruba Reviewed By: dhruba CC: leveldb Differential Revision: https://reviews.facebook.net/D10875
This commit is contained in:
@@ -110,7 +110,7 @@ struct DBImpl::DeletionState {
|
||||
};
|
||||
|
||||
// Fix user-supplied options to be reasonable
|
||||
template <class T,class V>
|
||||
template <class T, class V>
|
||||
static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
|
||||
if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
|
||||
if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
|
||||
@@ -2046,6 +2046,80 @@ Status DBImpl::Get(const ReadOptions& options,
|
||||
return s;
|
||||
}
|
||||
|
||||
std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
|
||||
const std::vector<Slice>& keys,
|
||||
std::vector<std::string>* values) {
|
||||
|
||||
StopWatch sw(env_, options_.statistics, DB_MULTIGET);
|
||||
SequenceNumber snapshot;
|
||||
MutexLock l(&mutex_);
|
||||
if (options.snapshot != nullptr) {
|
||||
snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
|
||||
} else {
|
||||
snapshot = versions_->LastSequence();
|
||||
}
|
||||
|
||||
MemTable* mem = mem_;
|
||||
MemTableList imm = imm_;
|
||||
Version* current = versions_->current();
|
||||
mem->Ref();
|
||||
imm.RefAll();
|
||||
current->Ref();
|
||||
|
||||
// Unlock while reading from files and memtables
|
||||
|
||||
mutex_.Unlock();
|
||||
bool have_stat_update = false;
|
||||
Version::GetStats stats;
|
||||
|
||||
// Note: this always resizes the values array
|
||||
int numKeys = keys.size();
|
||||
std::vector<Status> statList(numKeys);
|
||||
values->resize(numKeys);
|
||||
|
||||
// Keep track of bytes that we read for statistics-recording later
|
||||
uint64_t bytesRead = 0;
|
||||
|
||||
// For each of the given keys, apply the entire "get" process as follows:
|
||||
// First look in the memtable, then in the immutable memtable (if any).
|
||||
// s is both in/out. When in, s could either be OK or MergeInProgress.
|
||||
// value will contain the current merge operand in the latter case.
|
||||
// TODO: Maybe these could be run concurrently?
|
||||
for(int i=0; i<numKeys; ++i) {
|
||||
Status& s = statList[i];
|
||||
std::string* value = &(*values)[i];
|
||||
|
||||
LookupKey lkey(keys[i], snapshot);
|
||||
if (mem->Get(lkey, value, &s, options_)) {
|
||||
// Done
|
||||
} else if (imm.Get(lkey, value, &s, options_)) {
|
||||
// Done
|
||||
} else {
|
||||
current->Get(options, lkey, value, &s, &stats, options_);
|
||||
have_stat_update = true;
|
||||
}
|
||||
|
||||
if (s.ok()) {
|
||||
bytesRead += value->size();
|
||||
}
|
||||
}
|
||||
|
||||
// Post processing (decrement reference counts and record statistics)
|
||||
mutex_.Lock();
|
||||
if (!options_.disable_seek_compaction &&
|
||||
have_stat_update && current->UpdateStats(stats)) {
|
||||
MaybeScheduleCompaction();
|
||||
}
|
||||
mem->Unref();
|
||||
imm.UnrefAll();
|
||||
current->Unref();
|
||||
RecordTick(options_.statistics, NUMBER_MULTIGET_CALLS);
|
||||
RecordTick(options_.statistics, NUMBER_MULTIGET_KEYS_READ, numKeys);
|
||||
RecordTick(options_.statistics, NUMBER_MULTIGET_BYTES_READ, bytesRead);
|
||||
|
||||
return statList;
|
||||
}
|
||||
|
||||
Iterator* DBImpl::NewIterator(const ReadOptions& options) {
|
||||
SequenceNumber latest_snapshot;
|
||||
Iterator* internal_iter = NewInternalIterator(options, &latest_snapshot);
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
#include <atomic>
|
||||
#include <deque>
|
||||
#include <set>
|
||||
#include <vector>
|
||||
#include "db/dbformat.h"
|
||||
#include "db/log_file.h"
|
||||
#include "db/log_writer.h"
|
||||
@@ -44,6 +45,9 @@ class DBImpl : public DB {
|
||||
virtual Status Get(const ReadOptions& options,
|
||||
const Slice& key,
|
||||
std::string* value);
|
||||
virtual std::vector<Status> MultiGet(const ReadOptions& options,
|
||||
const std::vector<Slice>& keys,
|
||||
std::vector<std::string>* values);
|
||||
virtual Iterator* NewIterator(const ReadOptions&);
|
||||
virtual const Snapshot* GetSnapshot();
|
||||
virtual void ReleaseSnapshot(const Snapshot* snapshot);
|
||||
@@ -64,7 +68,7 @@ class DBImpl : public DB {
|
||||
|
||||
// Extra methods (for testing) that are not in the public DB interface
|
||||
|
||||
// Compact any files in the named level that overlap [*begin,*end]
|
||||
// Compact any files in the named level that overlap [*begin, *end]
|
||||
void TEST_CompactRange(int level, const Slice* begin, const Slice* end);
|
||||
|
||||
// Force current memtable contents to be compacted.
|
||||
|
||||
@@ -2961,8 +2961,15 @@ class ModelDB: public DB {
|
||||
}
|
||||
virtual Status Get(const ReadOptions& options,
|
||||
const Slice& key, std::string* value) {
|
||||
assert(false); // Not implemented
|
||||
return Status::NotFound(key);
|
||||
return Status::NotSupported(key);
|
||||
}
|
||||
|
||||
virtual std::vector<Status> MultiGet(const ReadOptions& options,
|
||||
const std::vector<Slice>& keys,
|
||||
std::vector<std::string>* values) {
|
||||
std::vector<Status> s(keys.size(),
|
||||
Status::NotSupported("Not implemented."));
|
||||
return s;
|
||||
}
|
||||
virtual Iterator* NewIterator(const ReadOptions& options) {
|
||||
if (options.snapshot == nullptr) {
|
||||
@@ -3210,6 +3217,61 @@ TEST(DBTest, Randomized) {
|
||||
} while (ChangeOptions());
|
||||
}
|
||||
|
||||
TEST(DBTest, MultiGetSimple) {
|
||||
ASSERT_OK(db_->Put(WriteOptions(),"k1","v1"));
|
||||
ASSERT_OK(db_->Put(WriteOptions(),"k2","v2"));
|
||||
ASSERT_OK(db_->Put(WriteOptions(),"k3","v3"));
|
||||
ASSERT_OK(db_->Put(WriteOptions(),"k4","v4"));
|
||||
ASSERT_OK(db_->Delete(WriteOptions(),"k4"));
|
||||
ASSERT_OK(db_->Put(WriteOptions(),"k5","v5"));
|
||||
ASSERT_OK(db_->Delete(WriteOptions(),"no_key"));
|
||||
|
||||
std::vector<Slice> keys(6);
|
||||
keys[0] = "k1";
|
||||
keys[1] = "k2";
|
||||
keys[2] = "k3";
|
||||
keys[3] = "k4";
|
||||
keys[4] = "k5";
|
||||
keys[6] = "no_key";
|
||||
|
||||
std::vector<std::string> values(20,"Temporary data to be overwritten");
|
||||
|
||||
std::vector<Status> s = db_->MultiGet(ReadOptions(),keys,&values);
|
||||
ASSERT_EQ(values.size(),keys.size());
|
||||
ASSERT_EQ(values[0], "v1");
|
||||
ASSERT_EQ(values[1], "v2");
|
||||
ASSERT_EQ(values[2], "v3");
|
||||
ASSERT_EQ(values[4], "v5");
|
||||
|
||||
ASSERT_OK(s[0]);
|
||||
ASSERT_OK(s[1]);
|
||||
ASSERT_OK(s[2]);
|
||||
ASSERT_TRUE(s[3].IsNotFound());
|
||||
ASSERT_OK(s[4]);
|
||||
ASSERT_TRUE(s[5].IsNotFound());
|
||||
}
|
||||
|
||||
TEST(DBTest, MultiGetEmpty) {
|
||||
// Empty Key Set
|
||||
std::vector<Slice> keys;
|
||||
std::vector<std::string> values;
|
||||
std::vector<Status> s = db_->MultiGet(ReadOptions(),keys,&values);
|
||||
ASSERT_EQ((int)s.size(),0);
|
||||
|
||||
// Empty Database, Empty Key Set
|
||||
DestroyAndReopen();
|
||||
s = db_->MultiGet(ReadOptions(), keys, &values);
|
||||
ASSERT_EQ((int)s.size(),0);
|
||||
|
||||
// Empty Database, Search for Keys
|
||||
keys.resize(2);
|
||||
keys[0] = "a";
|
||||
keys[1] = "b";
|
||||
s = db_->MultiGet(ReadOptions(),keys,&values);
|
||||
ASSERT_EQ((int)s.size(), 2);
|
||||
ASSERT_TRUE(s[0].IsNotFound() && s[1].IsNotFound());
|
||||
}
|
||||
|
||||
std::string MakeKey(unsigned int num) {
|
||||
char buf[30];
|
||||
snprintf(buf, sizeof(buf), "%016u", num);
|
||||
|
||||
@@ -106,6 +106,20 @@ class DB {
|
||||
virtual Status Get(const ReadOptions& options,
|
||||
const Slice& key, std::string* value) = 0;
|
||||
|
||||
// If keys[i] does not exist in the database, then the i'th returned
|
||||
// status will be one for which Status::IsNotFound() is true, and
|
||||
// (*values)[i] will be set to some arbitrary value (often ""). Otherwise,
|
||||
// the i'th returned status will have Status::ok() true, and (*values)[i]
|
||||
// will store the value associated with keys[i].
|
||||
//
|
||||
// (*values) will always be resized to be the same size as (keys).
|
||||
// Similarly, the number of returned statuses will be the number of keys.
|
||||
// Note: keys will not be "de-duplicated". Duplicate keys will return
|
||||
// duplicate values in order.
|
||||
virtual std::vector<Status> MultiGet(const ReadOptions& options,
|
||||
const std::vector<Slice>& keys,
|
||||
std::vector<std::string>* values) = 0;
|
||||
|
||||
// Return a heap-allocated iterator over the contents of the database.
|
||||
// The result of NewIterator() is initially invalid (caller must
|
||||
// call one of the Seek methods on the iterator before using it).
|
||||
|
||||
@@ -47,8 +47,15 @@ enum Tickers {
|
||||
// write throttle because of too many files in L0
|
||||
STALL_L0_NUM_FILES_MICROS = 15,
|
||||
RATE_LIMIT_DELAY_MILLIS = 16,
|
||||
|
||||
NO_ITERATORS = 17, // number of iterators currently open
|
||||
TICKER_ENUM_MAX = 18
|
||||
|
||||
// Number of MultiGet calls, keys read, and bytes read
|
||||
NUMBER_MULTIGET_CALLS = 18,
|
||||
NUMBER_MULTIGET_KEYS_READ = 19,
|
||||
NUMBER_MULTIGET_BYTES_READ = 20,
|
||||
|
||||
TICKER_ENUM_MAX = 21
|
||||
};
|
||||
|
||||
|
||||
@@ -66,8 +73,8 @@ enum Histograms {
|
||||
COMPACTION_OUTFILE_SYNC_MICROS = 4,
|
||||
WAL_FILE_SYNC_MICROS = 5,
|
||||
MANIFEST_FILE_SYNC_MICROS = 6,
|
||||
HISTOGRAM_ENUM_MAX = 7
|
||||
|
||||
DB_MULTIGET = 7,
|
||||
HISTOGRAM_ENUM_MAX = 8
|
||||
};
|
||||
|
||||
struct HistogramData {
|
||||
|
||||
@@ -207,6 +207,14 @@ Status DBWithTTL::Get(const ReadOptions& options,
|
||||
return StripTS(value);
|
||||
}
|
||||
|
||||
std::vector<Status> DBWithTTL::MultiGet(const ReadOptions& options,
|
||||
const std::vector<Slice>& keys,
|
||||
std::vector<std::string>* values) {
|
||||
return std::vector<Status>(keys.size(),
|
||||
Status::NotSupported("MultiGet not\
|
||||
supported with TTL"));
|
||||
}
|
||||
|
||||
Status DBWithTTL::Delete(const WriteOptions& wopts, const Slice& key) {
|
||||
return db_->Delete(wopts, key);
|
||||
}
|
||||
|
||||
@@ -29,6 +29,10 @@ class DBWithTTL : public DB, CompactionFilter {
|
||||
const Slice& key,
|
||||
std::string* value);
|
||||
|
||||
virtual std::vector<Status> MultiGet(const ReadOptions& options,
|
||||
const std::vector<Slice>& keys,
|
||||
std::vector<std::string>* values);
|
||||
|
||||
virtual Status Delete(const WriteOptions& wopts, const Slice& key);
|
||||
|
||||
virtual Status Merge(const WriteOptions& options,
|
||||
|
||||
Reference in New Issue
Block a user