mirror of
https://github.com/XRPLF/rippled.git
synced 2025-12-06 17:27:55 +00:00
[RocksDB] CompactionFilter cleanup
Summary: - removed the compaction_filter_value from the callback interface. Restrict compaction filter to purging values. - modify some comments to reflect curent status. Test Plan: make check Reviewers: dhruba Reviewed By: dhruba CC: leveldb Differential Revision: https://reviews.facebook.net/D10335
This commit is contained in:
@@ -1590,6 +1590,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
|
|||||||
SequenceNumber last_sequence_for_key __attribute__((unused)) =
|
SequenceNumber last_sequence_for_key __attribute__((unused)) =
|
||||||
kMaxSequenceNumber;
|
kMaxSequenceNumber;
|
||||||
SequenceNumber visible_in_snapshot = kMaxSequenceNumber;
|
SequenceNumber visible_in_snapshot = kMaxSequenceNumber;
|
||||||
|
std::string compaction_filter_value;
|
||||||
for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
|
for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
|
||||||
// Prioritize immutable compaction work
|
// Prioritize immutable compaction work
|
||||||
if (imm_.imm_flush_needed.NoBarrier_Load() != nullptr) {
|
if (imm_.imm_flush_needed.NoBarrier_Load() != nullptr) {
|
||||||
@@ -1605,7 +1606,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
|
|||||||
|
|
||||||
Slice key = input->key();
|
Slice key = input->key();
|
||||||
Slice value = input->value();
|
Slice value = input->value();
|
||||||
Slice* compaction_filter_value = nullptr;
|
|
||||||
if (compact->compaction->ShouldStopBefore(key) &&
|
if (compact->compaction->ShouldStopBefore(key) &&
|
||||||
compact->builder != nullptr) {
|
compact->builder != nullptr) {
|
||||||
status = FinishCompactionOutputFile(compact, input.get());
|
status = FinishCompactionOutputFile(compact, input.get());
|
||||||
@@ -1664,20 +1665,24 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
|
|||||||
ikey.type != kTypeDeletion &&
|
ikey.type != kTypeDeletion &&
|
||||||
ikey.sequence < earliest_snapshot) {
|
ikey.sequence < earliest_snapshot) {
|
||||||
// If the user has specified a compaction filter, then invoke
|
// If the user has specified a compaction filter, then invoke
|
||||||
// it. If this key is not visible via any snapshot and the
|
// it. If the return value of the compaction filter is true,
|
||||||
// return value of the compaction filter is true and then
|
|
||||||
// drop this key from the output.
|
// drop this key from the output.
|
||||||
|
bool value_changed = false;
|
||||||
|
compaction_filter_value.clear();
|
||||||
drop = options_.CompactionFilter(options_.compaction_filter_args,
|
drop = options_.CompactionFilter(options_.compaction_filter_args,
|
||||||
compact->compaction->level(),
|
compact->compaction->level(),
|
||||||
ikey.user_key, value, &compaction_filter_value);
|
ikey.user_key, value,
|
||||||
|
&compaction_filter_value,
|
||||||
|
&value_changed);
|
||||||
|
// Another example of statistics update without holding the lock
|
||||||
|
// TODO: clean it up
|
||||||
if (drop) {
|
if (drop) {
|
||||||
RecordTick(options_.statistics, COMPACTION_KEY_DROP_USER);
|
RecordTick(options_.statistics, COMPACTION_KEY_DROP_USER);
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the application wants to change the value, then do so here.
|
// If the application wants to change the value, then do so here.
|
||||||
if (compaction_filter_value != nullptr) {
|
if (value_changed) {
|
||||||
value = *compaction_filter_value;
|
value = compaction_filter_value;
|
||||||
delete compaction_filter_value;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1351,22 +1351,26 @@ TEST(DBTest, RepeatedWritesToSameKey) {
|
|||||||
static int cfilter_count;
|
static int cfilter_count;
|
||||||
static std::string NEW_VALUE = "NewValue";
|
static std::string NEW_VALUE = "NewValue";
|
||||||
static bool keep_filter(void* arg, int level, const Slice& key,
|
static bool keep_filter(void* arg, int level, const Slice& key,
|
||||||
const Slice& value, Slice** new_value) {
|
const Slice& value, std::string* new_value,
|
||||||
|
bool* value_changed) {
|
||||||
assert(arg == nullptr);
|
assert(arg == nullptr);
|
||||||
cfilter_count++;
|
cfilter_count++;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
static bool delete_filter(void*argv, int level, const Slice& key,
|
static bool delete_filter(void*argv, int level, const Slice& key,
|
||||||
const Slice& value, Slice** new_value) {
|
const Slice& value, std::string* new_value,
|
||||||
|
bool* value_changed) {
|
||||||
assert(argv == nullptr);
|
assert(argv == nullptr);
|
||||||
cfilter_count++;
|
cfilter_count++;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
static bool change_filter(void*argv, int level, const Slice& key,
|
static bool change_filter(void*argv, int level, const Slice& key,
|
||||||
const Slice& value, Slice** new_value) {
|
const Slice& value, std::string* new_value,
|
||||||
|
bool* value_changed) {
|
||||||
assert(argv == (void*)100);
|
assert(argv == (void*)100);
|
||||||
assert(new_value != nullptr);
|
assert(new_value != nullptr);
|
||||||
*new_value = new Slice(NEW_VALUE);
|
*new_value = NEW_VALUE;
|
||||||
|
*value_changed = true;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -356,22 +356,25 @@ struct Options {
|
|||||||
|
|
||||||
// This method allows an application to modify/delete a key-value at
|
// This method allows an application to modify/delete a key-value at
|
||||||
// the time of compaction. The compaction process invokes this
|
// the time of compaction. The compaction process invokes this
|
||||||
// method for every kv that is being compacted. A return value
|
// method for kv that is being compacted. A return value
|
||||||
// of false indicates that the kv should be preserved in the
|
// of false indicates that the kv should be preserved in the
|
||||||
// output of this compaction run and a return value of true
|
// output of this compaction run and a return value of true
|
||||||
// indicates that this key-value should be removed from the
|
// indicates that this key-value should be removed from the
|
||||||
// output of the compaction. The application can inspect
|
// output of the compaction. The application can inspect
|
||||||
// the existing value of the key, modify it if needed and
|
// the existing value of the key and make decision based on it.
|
||||||
// return back the new value for this key. The application
|
|
||||||
// should allocate memory for the Slice object that is used to
|
// When the value is to be preserved, the application has the option
|
||||||
// return the new value and the leveldb framework will
|
// to modify the existing_value and pass it back through new_value.
|
||||||
// free up that memory.
|
// value_changed needs to be set to true in this case.
|
||||||
|
|
||||||
// The compaction_filter_args, if specified here, are passed
|
// The compaction_filter_args, if specified here, are passed
|
||||||
// back to the invocation of the CompactionFilter.
|
// back to the invocation of the CompactionFilter.
|
||||||
void* compaction_filter_args;
|
void* compaction_filter_args;
|
||||||
bool (*CompactionFilter)(void* compaction_filter_args,
|
bool (*CompactionFilter)(void* compaction_filter_args,
|
||||||
int level, const Slice& key,
|
int level, const Slice& key,
|
||||||
const Slice& existing_value, Slice** new_value);
|
const Slice& existing_value,
|
||||||
|
std::string* new_value,
|
||||||
|
bool* value_changed);
|
||||||
|
|
||||||
// Disable automatic compactions. Manual compactions can still
|
// Disable automatic compactions. Manual compactions can still
|
||||||
// be issued on this database.
|
// be issued on this database.
|
||||||
|
|||||||
Reference in New Issue
Block a user