Skip to content

Commit

Permalink
Ability to invoke application hook for every key during compaction.
Browse files Browse the repository at this point in the history
Summary:
There are certain use-cases where the application intends to
delete older keys aftre they have expired a certian time period.
One option for those applications is to periodically scan the
entire database and delete appropriate keys.

A better way is to allow the application to hook into the
compaction process. This patch allows the application to set
a method callback for every key that is being compacted. If
this method returns true, then the key is not preserved in
the output of the compaction.

Test Plan:
This is mostly to preview the proposed new public api.
Since it is a public api, please do due diligence on reviewing it.

I will be writing test cases for this api in mynext version of
this patch.

Reviewers: MarkCallaghan, heyongqiang

Reviewed By: heyongqiang

CC: sheki, adsharma

Differential Revision: https://reviews.facebook.net/D6285
  • Loading branch information
dhruba committed Nov 6, 2012
1 parent f1a7c73 commit 5273c81
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 2 deletions.
19 changes: 18 additions & 1 deletion db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
}

Slice key = input->key();
Slice value = input->value();
Slice* compaction_filter_value = NULL;
if (compact->compaction->ShouldStopBefore(key) &&
compact->builder != NULL) {
status = FinishCompactionOutputFile(compact, input);
Expand Down Expand Up @@ -1138,6 +1140,21 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
// few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped.
drop = true;
} else if (options_.CompactionFilter != NULL &&
ikey.type != kTypeDeletion &&
ikey.sequence < compact->smallest_snapshot) {
// If the user has specified a compaction filter, then invoke
// it. If this key is not visible via any snapshot and the
// return value of the compaction filter is true and then
// drop this key from the output.
drop = options_.CompactionFilter(compact->compaction->level(),
ikey.user_key, value, &compaction_filter_value);

// If the application wants to change the value, then do so here.
if (compaction_filter_value != NULL) {
value = *compaction_filter_value;
delete compaction_filter_value;
}
}

last_sequence_for_key = ikey.sequence;
Expand All @@ -1164,7 +1181,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
compact->current_output()->smallest.DecodeFrom(key);
}
compact->current_output()->largest.DecodeFrom(key);
compact->builder->Add(key, input->value());
compact->builder->Add(key, value);

// Close output file if it is big enough
if (compact->builder->FileSize() >=
Expand Down
169 changes: 169 additions & 0 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1195,6 +1195,175 @@ TEST(DBTest, RepeatedWritesToSameKey) {
}
}

// This is a static filter used for filtering
// kvs during the compaction process.
static int cfilter_count;
static std::string NEW_VALUE = "NewValue";
static bool keep_filter(int level, const Slice& key,
const Slice& value, Slice** new_value) {
cfilter_count++;
return false;
}
static bool delete_filter(int level, const Slice& key,
const Slice& value, Slice** new_value) {
cfilter_count++;
return true;
}
static bool change_filter(int level, const Slice& key,
const Slice& value, Slice** new_value) {
assert(new_value != NULL);
*new_value = new Slice(NEW_VALUE);
return false;
}

TEST(DBTest, CompactionFilter) {
Options options = CurrentOptions();
options.num_levels = 3;
options.max_mem_compaction_level = 0;
options.CompactionFilter = keep_filter;
Reopen(&options);

// Write 100K+1 keys, these are written to a few files
// in L0. We do this so that the current snapshot points
// to the 100001 key.The compaction filter is not invoked
// on keys that are visible via a snapshot because we
// anyways cannot delete it.
const std::string value(10, 'x');
for (int i = 0; i < 100001; i++) {
char key[100];
snprintf(key, sizeof(key), "B%010d", i);
Put(key, value);
}
dbfull()->TEST_CompactMemTable();

// Push all files to the highest level L2. Verify that
// the compaction is each level invokes the filter for
// all the keys in that level.
cfilter_count = 0;
dbfull()->TEST_CompactRange(0, NULL, NULL);
ASSERT_EQ(cfilter_count, 100000);
cfilter_count = 0;
dbfull()->TEST_CompactRange(1, NULL, NULL);
ASSERT_EQ(cfilter_count, 100000);

ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
ASSERT_NE(NumTableFilesAtLevel(2), 0);
cfilter_count = 0;

// overwrite all the 100K+1 keys once again.
for (int i = 0; i < 100001; i++) {
char key[100];
snprintf(key, sizeof(key), "B%010d", i);
Put(key, value);
}
dbfull()->TEST_CompactMemTable();

// push all files to the highest level L2. This
// means that all keys should pass at least once
// via the compaction filter
cfilter_count = 0;
dbfull()->TEST_CompactRange(0, NULL, NULL);
ASSERT_EQ(cfilter_count, 100000);
cfilter_count = 0;
dbfull()->TEST_CompactRange(1, NULL, NULL);
ASSERT_EQ(cfilter_count, 100000);
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
ASSERT_NE(NumTableFilesAtLevel(2), 0);

// create a new database with the compaction
// filter in such a way that it deletes all keys
options.CompactionFilter = delete_filter;
options.create_if_missing = true;
DestroyAndReopen(&options);

// write all the keys once again.
for (int i = 0; i < 100001; i++) {
char key[100];
snprintf(key, sizeof(key), "B%010d", i);
Put(key, value);
}
dbfull()->TEST_CompactMemTable();
ASSERT_NE(NumTableFilesAtLevel(0), 0);
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
ASSERT_EQ(NumTableFilesAtLevel(2), 0);

// Push all files to the highest level L2. This
// triggers the compaction filter to delete all keys,
// verify that at the end of the compaction process,
// nothing is left.
cfilter_count = 0;
dbfull()->TEST_CompactRange(0, NULL, NULL);
ASSERT_EQ(cfilter_count, 100000);
cfilter_count = 0;
dbfull()->TEST_CompactRange(1, NULL, NULL);
ASSERT_EQ(cfilter_count, 0);
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_EQ(NumTableFilesAtLevel(1), 0);

// Scan the entire database to ensure that only the
// 100001th key is left in the db. The 100001th key
// is part of the default-most-current snapshot and
// cannot be deleted.
Iterator* iter = db_->NewIterator(ReadOptions());
iter->SeekToFirst();
int count = 0;
while (iter->Valid()) {
count++;
iter->Next();
}
ASSERT_EQ(count, 1);
delete iter;
}

TEST(DBTest, CompactionFilterWithValueChange) {
Options options = CurrentOptions();
options.num_levels = 3;
options.max_mem_compaction_level = 0;
options.CompactionFilter = change_filter;
Reopen(&options);

// Write 100K+1 keys, these are written to a few files
// in L0. We do this so that the current snapshot points
// to the 100001 key.The compaction filter is not invoked
// on keys that are visible via a snapshot because we
// anyways cannot delete it.
const std::string value(10, 'x');
for (int i = 0; i < 100001; i++) {
char key[100];
snprintf(key, sizeof(key), "B%010d", i);
Put(key, value);
}

// push all files to lower levels
dbfull()->TEST_CompactMemTable();
dbfull()->TEST_CompactRange(0, NULL, NULL);
dbfull()->TEST_CompactRange(1, NULL, NULL);

// re-write all data again
for (int i = 0; i < 100001; i++) {
char key[100];
snprintf(key, sizeof(key), "B%010d", i);
Put(key, value);
}

// push all files to lower levels. This should
// invoke the compaction filter for all 100000 keys.
dbfull()->TEST_CompactMemTable();
dbfull()->TEST_CompactRange(0, NULL, NULL);
dbfull()->TEST_CompactRange(1, NULL, NULL);

// verify that all keys now have the new value that
// was set by the compaction process.
for (int i = 0; i < 100000; i++) {
char key[100];
snprintf(key, sizeof(key), "B%010d", i);
std::string newvalue = Get(key);
ASSERT_EQ(newvalue.compare(NEW_VALUE), 0);
}
}

TEST(DBTest, SparseMerge) {
Options options = CurrentOptions();
options.compression = kNoCompression;
Expand Down
16 changes: 16 additions & 0 deletions include/leveldb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <stddef.h>
#include <string>
#include <stdint.h>
#include "leveldb/slice.h"

namespace leveldb {

Expand Down Expand Up @@ -299,6 +300,21 @@ struct Options {
Options();

void Dump(Logger * log) const;

// This method allows an application to modify/delete a key-value at
// the time of compaction. The compaction process invokes this
// method for every kv that is being compacted. A return value
// of false indicates that the kv should be preserved in the
// output of this compaction run and a return value of true
// indicates that this key-value should be removed from the
// output of the compaction. The application can inspect
// the existing value of the key, modify it if needed and
// return back the new value for this key. The application
// should allocate memory for the Slice object that is used to
// return the new value and the leveldb framework will
// free up that memory.
bool (*CompactionFilter)(int level, const Slice& key,
const Slice& existing_value, Slice** new_value);
};

// Options that control read operations
Expand Down
5 changes: 4 additions & 1 deletion util/options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ Options::Options()
table_cache_numshardbits(4),
max_log_file_size(0),
delete_obsolete_files_period_micros(0),
rate_limit(0.0) {
rate_limit(0.0),
CompactionFilter(NULL) {
}

void
Expand Down Expand Up @@ -123,6 +124,8 @@ Options::Dump(
delete_obsolete_files_period_micros);
Log(log," Options.rate_limit: %.2f",
rate_limit);
Log(log," Options.CompactionFilter: %p",
CompactionFilter);
} // Options::Dump


Expand Down

0 comments on commit 5273c81

Please sign in to comment.