Skip to content

Commit

Permalink
Allow applying CompactionFilter outside of compaction (facebook#8243)
Browse files Browse the repository at this point in the history
Summary:
From HISTORY.md release note:

- Allow `CompactionFilter`s to apply in more table file creation scenarios such as flush and recovery. For compatibility, `CompactionFilter`s by default apply during compaction. Users can customize this behavior by overriding `CompactionFilterFactory::ShouldFilterTableFileCreation()`.
- Removed unused structure `CompactionFilterContext`

Pull Request resolved: facebook#8243

Test Plan: added unit tests

Reviewed By: pdillinger

Differential Revision: D28088089

Pulled By: ajkr

fbshipit-source-id: 0799be7908e3b39fea09fc3f1ab00e13ad817fae
  • Loading branch information
ajkr authored and facebook-github-bot committed May 7, 2021
1 parent 242ac6c commit a639c02
Show file tree
Hide file tree
Showing 9 changed files with 252 additions and 70 deletions.
4 changes: 3 additions & 1 deletion HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@

### New Features
* Add new option allow_stall passed during instance creation of WriteBufferManager. When allow_stall is set, WriteBufferManager will stall all writers shared across multiple DBs and columns if memory usage goes beyond specified WriteBufferManager::buffer_size (soft limit). Stall will be cleared when memory is freed after flush and memory usage goes down below buffer_size.
* Allow `CompactionFilter`s to apply in more table file creation scenarios such as flush and recovery. For compatibility, `CompactionFilter`s by default apply during compaction. Users can customize this behavior by overriding `CompactionFilterFactory::ShouldFilterTableFileCreation()`.
* Added more fields to FilterBuildingContext with LSM details, for custom filter policies that vary behavior based on where they are in the LSM-tree.

### Performace Improvements
### Performance Improvements
* BlockPrefetcher is used by iterators to prefetch data if they anticipate more data to be used in future. It is enabled implicitly by rocksdb. Added change to take in account read pattern if reads are sequential. This would disable prefetching for random reads in MultiGet and iterators as readahead_size is increased exponential doing large prefetches.

### Public API change
* Removed a parameter from TableFactory::NewTableBuilder, which should not be called by user code because TableBuilder is not a public API.
* Removed unused structure `CompactionFilterContext`.
* The `skip_filters` parameter to SstFileWriter is now considered deprecated. Use `BlockBasedTableOptions::filter_policy` to control generation of filters.
* ClockCache is known to have bugs that could lead to crash or corruption, so should not be used until fixed. Use NewLRUCache instead.
* Deprecated backupable_db.h and BackupableDBOptions in favor of new versions with appropriate names: backup_engine.h and BackupEngineOptions. Old API compatibility is preserved.
Expand Down
34 changes: 27 additions & 7 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,26 @@ Status BuildTable(

TableProperties tp;
if (iter->Valid() || !range_del_agg->IsEmpty()) {
std::unique_ptr<CompactionFilter> compaction_filter;
if (ioptions.compaction_filter_factory != nullptr &&
ioptions.compaction_filter_factory->ShouldFilterTableFileCreation(
tboptions.reason)) {
CompactionFilter::Context context;
context.is_full_compaction = false;
context.is_manual_compaction = false;
context.column_family_id = tboptions.column_family_id;
context.reason = tboptions.reason;
compaction_filter =
ioptions.compaction_filter_factory->CreateCompactionFilter(context);
if (compaction_filter != nullptr &&
!compaction_filter->IgnoreSnapshots()) {
s.PermitUncheckedError();
return Status::NotSupported(
"CompactionFilter::IgnoreSnapshots() = false is not supported "
"anymore.");
}
}

TableBuilder* builder;
std::unique_ptr<WritableFileWriter> file_writer;
{
Expand Down Expand Up @@ -143,11 +163,11 @@ Status BuildTable(
builder = NewTableBuilder(tboptions, file_writer.get());
}

MergeHelper merge(env, tboptions.internal_comparator.user_comparator(),
ioptions.merge_operator.get(), nullptr, ioptions.logger,
true /* internal key corruption is not ok */,
snapshots.empty() ? 0 : snapshots.back(),
snapshot_checker);
MergeHelper merge(
env, tboptions.internal_comparator.user_comparator(),
ioptions.merge_operator.get(), compaction_filter.get(), ioptions.logger,
true /* internal key corruption is not ok */,
snapshots.empty() ? 0 : snapshots.back(), snapshot_checker);

std::unique_ptr<BlobFileBuilder> blob_file_builder(
(mutable_cf_options.enable_blob_files && blob_file_additions)
Expand All @@ -165,8 +185,8 @@ Status BuildTable(
snapshot_checker, env, ShouldReportDetailedTime(env, ioptions.stats),
true /* internal key corruption is not ok */, range_del_agg.get(),
blob_file_builder.get(), ioptions.allow_data_in_errors,
/*compaction=*/nullptr,
/*compaction_filter=*/nullptr, /*shutting_down=*/nullptr,
/*compaction=*/nullptr, compaction_filter.get(),
/*shutting_down=*/nullptr,
/*preserve_deletes_seqnum=*/0, /*manual_compaction_paused=*/nullptr,
db_options.info_log, full_history_ts_low);

Expand Down
7 changes: 7 additions & 0 deletions db/compaction/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -526,10 +526,17 @@ std::unique_ptr<CompactionFilter> Compaction::CreateCompactionFilter() const {
return nullptr;
}

if (!cfd_->ioptions()
->compaction_filter_factory->ShouldFilterTableFileCreation(
TableFileCreationReason::kCompaction)) {
return nullptr;
}

CompactionFilter::Context context;
context.is_full_compaction = is_full_compaction_;
context.is_manual_compaction = is_manual_compaction_;
context.column_family_id = cfd_->GetID();
context.reason = TableFileCreationReason::kCompaction;
return cfd_->ioptions()->compaction_filter_factory->CreateCompactionFilter(
context);
}
Expand Down
14 changes: 10 additions & 4 deletions db/compaction/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ CompactionIterator::CompactionIterator(
blob_garbage_collection_cutoff_file_number_(
ComputeBlobGarbageCollectionCutoffFileNumber(compaction_.get())),
current_key_committed_(false),
cmp_with_history_ts_low_(0) {
assert(compaction_filter_ == nullptr || compaction_ != nullptr);
cmp_with_history_ts_low_(0),
level_(compaction_ == nullptr ? 0 : compaction_->level()) {
assert(snapshots_ != nullptr);
bottommost_level_ = compaction_ == nullptr
? false
Expand Down Expand Up @@ -232,7 +232,7 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
if (kTypeBlobIndex == ikey_.type) {
blob_value_.Reset();
filter = compaction_filter_->FilterBlobByKey(
compaction_->level(), filter_key, &compaction_filter_value_,
level_, filter_key, &compaction_filter_value_,
compaction_filter_skip_until_.rep());
if (CompactionFilter::Decision::kUndetermined == filter &&
!compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
Expand All @@ -251,6 +251,12 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
valid_ = false;
return false;
}
if (compaction_ == nullptr) {
status_ =
Status::Corruption("Unexpected blob index outside of compaction");
valid_ = false;
return false;
}
const Version* const version = compaction_->input_version();
assert(version);

Expand All @@ -271,7 +277,7 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
}
if (CompactionFilter::Decision::kUndetermined == filter) {
filter = compaction_filter_->FilterV2(
compaction_->level(), filter_key, value_type,
level_, filter_key, value_type,
blob_value_.empty() ? value_ : blob_value_, &compaction_filter_value_,
compaction_filter_skip_until_.rep());
}
Expand Down
2 changes: 2 additions & 0 deletions db/compaction/compaction_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,8 @@ class CompactionIterator {
// Saved result of ucmp->CompareTimestamp(current_ts_, *full_history_ts_low_)
int cmp_with_history_ts_low_;

const int level_;

bool IsShuttingDown() {
// This is a best-effort facility, so memory_order_relaxed is sufficient.
return shutting_down_ && shutting_down_->load(std::memory_order_relaxed);
Expand Down
136 changes: 131 additions & 5 deletions db/db_compaction_filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ class DeleteFilter : public CompactionFilter {
return true;
}

bool FilterMergeOperand(int /*level*/, const Slice& /*key*/,
const Slice& /*operand*/) const override {
return true;
}

const char* Name() const override { return "DeleteFilter"; }
};

Expand Down Expand Up @@ -190,18 +195,36 @@ class KeepFilterFactory : public CompactionFilterFactory {
bool compaction_filter_created_;
};

// This filter factory is configured with a `TableFileCreationReason`. Only
// table files created for that reason will undergo filtering. This
// configurability makes it useful to tests for filtering non-compaction table
// files, such as "CompactionFilterFlush" and "CompactionFilterRecovery".
class DeleteFilterFactory : public CompactionFilterFactory {
public:
explicit DeleteFilterFactory(TableFileCreationReason reason)
: reason_(reason) {}

std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) override {
if (context.is_manual_compaction) {
return std::unique_ptr<CompactionFilter>(new DeleteFilter());
} else {
EXPECT_EQ(reason_, context.reason);
if (context.reason == TableFileCreationReason::kCompaction &&
!context.is_manual_compaction) {
// Table files created by automatic compaction do not undergo filtering.
// Presumably some tests rely on this.
return std::unique_ptr<CompactionFilter>(nullptr);
}
return std::unique_ptr<CompactionFilter>(new DeleteFilter());
}

bool ShouldFilterTableFileCreation(
TableFileCreationReason reason) const override {
return reason_ == reason;
}

const char* Name() const override { return "DeleteFilterFactory"; }

private:
const TableFileCreationReason reason_;
};

// Delete Filter Factory which ignores snapshots
Expand Down Expand Up @@ -349,7 +372,8 @@ TEST_F(DBTestCompactionFilter, CompactionFilter) {

// create a new database with the compaction
// filter in such a way that it deletes all keys
options.compaction_filter_factory = std::make_shared<DeleteFilterFactory>();
options.compaction_filter_factory = std::make_shared<DeleteFilterFactory>(
TableFileCreationReason::kCompaction);
options.create_if_missing = true;
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
Expand Down Expand Up @@ -421,7 +445,8 @@ TEST_F(DBTestCompactionFilter, CompactionFilter) {
// entries in VersionEdit, but none of the 'AddFile's.
TEST_F(DBTestCompactionFilter, CompactionFilterDeletesAll) {
Options options = CurrentOptions();
options.compaction_filter_factory = std::make_shared<DeleteFilterFactory>();
options.compaction_filter_factory = std::make_shared<DeleteFilterFactory>(
TableFileCreationReason::kCompaction);
options.disable_auto_compactions = true;
options.create_if_missing = true;
DestroyAndReopen(options);
Expand Down Expand Up @@ -450,6 +475,64 @@ TEST_F(DBTestCompactionFilter, CompactionFilterDeletesAll) {
}
#endif // ROCKSDB_LITE

TEST_F(DBTestCompactionFilter, CompactionFilterFlush) {
// Tests a `CompactionFilterFactory` that filters when table file is created
// by flush.
Options options = CurrentOptions();
options.compaction_filter_factory =
std::make_shared<DeleteFilterFactory>(TableFileCreationReason::kFlush);
options.merge_operator = MergeOperators::CreateStringAppendOperator();
Reopen(options);

// Puts and Merges are purged in flush.
ASSERT_OK(Put("a", "v"));
ASSERT_OK(Merge("b", "v"));
ASSERT_OK(Flush());
ASSERT_EQ("NOT_FOUND", Get("a"));
ASSERT_EQ("NOT_FOUND", Get("b"));

// However, Puts and Merges are preserved by recovery.
ASSERT_OK(Put("a", "v"));
ASSERT_OK(Merge("b", "v"));
Reopen(options);
ASSERT_EQ("v", Get("a"));
ASSERT_EQ("v", Get("b"));

// Likewise, compaction does not apply filtering.
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ("v", Get("a"));
ASSERT_EQ("v", Get("b"));
}

TEST_F(DBTestCompactionFilter, CompactionFilterRecovery) {
// Tests a `CompactionFilterFactory` that filters when table file is created
// by recovery.
Options options = CurrentOptions();
options.compaction_filter_factory =
std::make_shared<DeleteFilterFactory>(TableFileCreationReason::kRecovery);
options.merge_operator = MergeOperators::CreateStringAppendOperator();
Reopen(options);

// Puts and Merges are purged in recovery.
ASSERT_OK(Put("a", "v"));
ASSERT_OK(Merge("b", "v"));
Reopen(options);
ASSERT_EQ("NOT_FOUND", Get("a"));
ASSERT_EQ("NOT_FOUND", Get("b"));

// However, Puts and Merges are preserved by flush.
ASSERT_OK(Put("a", "v"));
ASSERT_OK(Merge("b", "v"));
ASSERT_OK(Flush());
ASSERT_EQ("v", Get("a"));
ASSERT_EQ("v", Get("b"));

// Likewise, compaction does not apply filtering.
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ("v", Get("a"));
ASSERT_EQ("v", Get("b"));
}

TEST_P(DBTestCompactionFilterWithCompactParam,
CompactionFilterWithValueChange) {
Options options = CurrentOptions();
Expand Down Expand Up @@ -842,6 +925,49 @@ TEST_F(DBTestCompactionFilter, IgnoreSnapshotsFalse) {
delete options.compaction_filter;
}

class TestNotSupportedFilterFactory : public CompactionFilterFactory {
public:
explicit TestNotSupportedFilterFactory(TableFileCreationReason reason)
: reason_(reason) {}

bool ShouldFilterTableFileCreation(
TableFileCreationReason reason) const override {
return reason_ == reason;
}

std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& /* context */) override {
return std::unique_ptr<CompactionFilter>(new TestNotSupportedFilter());
}

const char* Name() const override { return "TestNotSupportedFilterFactory"; }

private:
const TableFileCreationReason reason_;
};

TEST_F(DBTestCompactionFilter, IgnoreSnapshotsFalseDuringFlush) {
Options options = CurrentOptions();
options.compaction_filter_factory =
std::make_shared<TestNotSupportedFilterFactory>(
TableFileCreationReason::kFlush);
Reopen(options);

ASSERT_OK(Put("a", "v10"));
ASSERT_TRUE(Flush().IsNotSupported());
}

TEST_F(DBTestCompactionFilter, IgnoreSnapshotsFalseRecovery) {
Options options = CurrentOptions();
options.compaction_filter_factory =
std::make_shared<TestNotSupportedFilterFactory>(
TableFileCreationReason::kRecovery);
Reopen(options);

ASSERT_OK(Put("a", "v10"));
ASSERT_TRUE(TryReopen(options).IsNotSupported());
}

} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down
Loading

0 comments on commit a639c02

Please sign in to comment.