mirror of
https://github.com/XRPLF/rippled.git
synced 2025-12-06 17:27:55 +00:00
[RocksDB Performance Branch] Introduce MergeContext to Lazily Initialize merge operand list
Summary: In get operations, merge_operands is only used in few cases. Lazily initialize it can reduce average latency in some cases Test Plan: make all check Reviewers: haobo, kailiu, dhruba Reviewed By: haobo CC: igor, nkg-, leveldb Differential Revision: https://reviews.facebook.net/D14415
This commit is contained in:
@@ -16,6 +16,7 @@
|
||||
#include "db/log_reader.h"
|
||||
#include "db/log_writer.h"
|
||||
#include "db/memtable.h"
|
||||
#include "db/merge_context.h"
|
||||
#include "db/table_cache.h"
|
||||
#include "rocksdb/env.h"
|
||||
#include "rocksdb/merge_operator.h"
|
||||
@@ -287,7 +288,8 @@ struct Saver {
|
||||
bool* value_found; // Is value set correctly? Used by KeyMayExist
|
||||
std::string* value;
|
||||
const MergeOperator* merge_operator;
|
||||
std::deque<std::string>* merge_operands; // the merge operations encountered
|
||||
// the merge operations encountered;
|
||||
MergeContext* merge_context;
|
||||
Logger* logger;
|
||||
bool didIO; // did we do any disk io?
|
||||
Statistics* statistics;
|
||||
@@ -309,10 +311,10 @@ static void MarkKeyMayExist(void* arg) {
|
||||
|
||||
static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){
|
||||
Saver* s = reinterpret_cast<Saver*>(arg);
|
||||
std::deque<std::string>* const ops = s->merge_operands; // shorter alias
|
||||
MergeContext* merge_contex = s->merge_context;
|
||||
std::string merge_result; // temporary area for merge results later
|
||||
|
||||
assert(s != nullptr && ops != nullptr);
|
||||
assert(s != nullptr && merge_contex != nullptr);
|
||||
|
||||
ParsedInternalKey parsed_key;
|
||||
// TODO: didIO and Merge?
|
||||
@@ -331,7 +333,8 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){
|
||||
} else if (kMerge == s->state) {
|
||||
assert(s->merge_operator != nullptr);
|
||||
s->state = kFound;
|
||||
if (!s->merge_operator->FullMerge(s->user_key, &v, *ops,
|
||||
if (!s->merge_operator->FullMerge(s->user_key, &v,
|
||||
merge_contex->GetOperands(),
|
||||
s->value, s->logger)) {
|
||||
RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
|
||||
s->state = kCorrupt;
|
||||
@@ -346,8 +349,9 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){
|
||||
s->state = kDeleted;
|
||||
} else if (kMerge == s->state) {
|
||||
s->state = kFound;
|
||||
if (!s->merge_operator->FullMerge(s->user_key, nullptr, *ops,
|
||||
s->value, s->logger)) {
|
||||
if (!s->merge_operator->FullMerge(s->user_key, nullptr,
|
||||
merge_contex->GetOperands(),
|
||||
s->value, s->logger)) {
|
||||
RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
|
||||
s->state = kCorrupt;
|
||||
}
|
||||
@@ -359,16 +363,15 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){
|
||||
case kTypeMerge:
|
||||
assert(s->state == kNotFound || s->state == kMerge);
|
||||
s->state = kMerge;
|
||||
ops->push_front(v.ToString());
|
||||
while (ops->size() >= 2) {
|
||||
merge_contex->PushOperand(v);
|
||||
while (merge_contex->GetNumOperands() >= 2) {
|
||||
// Attempt to merge operands together via user associateive merge
|
||||
if (s->merge_operator->PartialMerge(s->user_key,
|
||||
Slice((*ops)[0]),
|
||||
Slice((*ops)[1]),
|
||||
merge_contex->GetOperand(0),
|
||||
merge_contex->GetOperand(1),
|
||||
&merge_result,
|
||||
s->logger)) {
|
||||
ops->pop_front();
|
||||
swap(ops->front(), merge_result);
|
||||
merge_contex->PushPartialMergeResult(merge_result);
|
||||
} else {
|
||||
// Associative merge returns false ==> stack the operands
|
||||
break;
|
||||
@@ -417,7 +420,7 @@ void Version::Get(const ReadOptions& options,
|
||||
const LookupKey& k,
|
||||
std::string* value,
|
||||
Status* status,
|
||||
std::deque<std::string>* operands,
|
||||
MergeContext* merge_context,
|
||||
GetStats* stats,
|
||||
const Options& db_options,
|
||||
bool* value_found) {
|
||||
@@ -436,7 +439,7 @@ void Version::Get(const ReadOptions& options,
|
||||
saver.value_found = value_found;
|
||||
saver.value = value;
|
||||
saver.merge_operator = merge_operator;
|
||||
saver.merge_operands = operands;
|
||||
saver.merge_context = merge_context;
|
||||
saver.logger = logger.get();
|
||||
saver.didIO = false;
|
||||
saver.statistics = db_options.statistics.get();
|
||||
@@ -564,7 +567,8 @@ void Version::Get(const ReadOptions& options,
|
||||
if (kMerge == saver.state) {
|
||||
// merge_operands are in saver and we hit the beginning of the key history
|
||||
// do a final merge of nullptr and operands;
|
||||
if (merge_operator->FullMerge(user_key, nullptr, *saver.merge_operands,
|
||||
if (merge_operator->FullMerge(user_key, nullptr,
|
||||
saver.merge_context->GetOperands(),
|
||||
value, logger.get())) {
|
||||
*status = Status::OK();
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user