Skip to content

Commit

Permalink
collecting kValue type tombstone
Browse files Browse the repository at this point in the history
Summary:
In our testing cluster, we found large amount tombstone has been promoted to kValue type from kMerge after reaching the top level of compaction. Since we used to only collecting tombstone in merge operator, those tombstones can never be collected.

This PR addresses the issue by adding a GC step in compaction filter, which is only for kValue type records. Since those record already reached the top of compaction (no earlier data exists) we can safely remove them in compaction filter without worrying old data appears.

This PR also removes an old optimization in cassandra merge operator for single merge operands.  We need to do GC even on a single operand, so the optimation does not make sense anymore.
Closes facebook#2855

Reviewed By: sagar0

Differential Revision: D5806445

Pulled By: wpc

fbshipit-source-id: 6eb25629d4ce917eb5e8b489f64a6aa78c7d270b
  • Loading branch information
wpc authored and facebook-github-bot committed Sep 18, 2017
1 parent 60beefd commit e4234fb
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 38 deletions.
9 changes: 5 additions & 4 deletions java/rocksjni/cassandra_compactionfilterjni.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@
/*
* Class: org_rocksdb_CassandraCompactionFilter
* Method: createNewCassandraCompactionFilter0
* Signature: ()J
* Signature: (ZI)J
*/
jlong Java_org_rocksdb_CassandraCompactionFilter_createNewCassandraCompactionFilter0(
JNIEnv* env, jclass jcls, jboolean purge_ttl_on_expiration) {
auto* compaction_filter =
new rocksdb::cassandra::CassandraCompactionFilter(purge_ttl_on_expiration);
JNIEnv* env, jclass jcls, jboolean purge_ttl_on_expiration,
jint gc_grace_period_in_seconds) {
auto* compaction_filter = new rocksdb::cassandra::CassandraCompactionFilter(
purge_ttl_on_expiration, gc_grace_period_in_seconds);
// set the native handle to our native compaction filter
return reinterpret_cast<jlong>(compaction_filter);
}
7 changes: 4 additions & 3 deletions java/src/main/java/org/rocksdb/CassandraCompactionFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
*/
public class CassandraCompactionFilter
extends AbstractCompactionFilter<Slice> {
public CassandraCompactionFilter(boolean purgeTtlOnExpiration) {
super(createNewCassandraCompactionFilter0(purgeTtlOnExpiration));
public CassandraCompactionFilter(boolean purgeTtlOnExpiration, int gcGracePeriodInSeconds) {
super(createNewCassandraCompactionFilter0(purgeTtlOnExpiration, gcGracePeriodInSeconds));
}

private native static long createNewCassandraCompactionFilter0(boolean purgeTtlOnExpiration);
private native static long createNewCassandraCompactionFilter0(
boolean purgeTtlOnExpiration, int gcGracePeriodInSeconds);
}
4 changes: 4 additions & 0 deletions utilities/cassandra/cassandra_compaction_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ CompactionFilter::Decision CassandraCompactionFilter::FilterV2(
? row_value.RemoveExpiredColumns(&value_changed)
: row_value.ConvertExpiredColumnsToTombstones(&value_changed);

if (value_type == ValueType::kValue) {
compacted = compacted.RemoveTombstones(gc_grace_period_in_seconds_);
}

if(compacted.Empty()) {
return Decision::kRemove;
}
Expand Down
25 changes: 14 additions & 11 deletions utilities/cassandra/cassandra_compaction_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,28 @@ namespace cassandra {
* Compaction filter for removing expired Cassandra data with ttl.
* If option `purge_ttl_on_expiration` is set to true, expired data
* will be directly purged. Otherwise expired data will be converted
* tombstones first, then be eventally removed after gc grace period.
* `purge_ttl_on_expiration` should only be on in the case all the
* tombstones first, then be eventally removed after gc grace period.
* `purge_ttl_on_expiration` should only be on in the case all the
* writes have same ttl setting, otherwise it could bring old data back.
*
* Compaction filter is also in charge of removing tombstone that has been
* promoted to kValue type after serials of merging in compaction.
*/
class CassandraCompactionFilter : public CompactionFilter {
public:
explicit CassandraCompactionFilter(bool purge_ttl_on_expiration)
: purge_ttl_on_expiration_(purge_ttl_on_expiration) {}
explicit CassandraCompactionFilter(bool purge_ttl_on_expiration,
int32_t gc_grace_period_in_seconds)
: purge_ttl_on_expiration_(purge_ttl_on_expiration),
gc_grace_period_in_seconds_(gc_grace_period_in_seconds) {}

const char* Name() const override;
virtual Decision FilterV2(int level,
const Slice& key,
ValueType value_type,
const Slice& existing_value,
std::string* new_value,
std::string* skip_until) const override;
const char* Name() const override;
virtual Decision FilterV2(int level, const Slice& key, ValueType value_type,
const Slice& existing_value, std::string* new_value,
std::string* skip_until) const override;

private:
bool purge_ttl_on_expiration_;
int32_t gc_grace_period_in_seconds_;
};
} // namespace cassandra
} // namespace rocksdb
13 changes: 13 additions & 0 deletions utilities/cassandra/cassandra_format_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,19 @@ TEST(ExpiringColumnTest, ExpiringColumn) {
== 0);
}

TEST(TombstoneTest, TombstoneCollectable) {
int32_t now = (int32_t)time(nullptr);
int32_t gc_grace_seconds = 16440;
EXPECT_TRUE(Tombstone(ColumnTypeMask::DELETION_MASK, 0,
now - gc_grace_seconds,
ToMicroSeconds(now - gc_grace_seconds))
.Collectable(gc_grace_seconds));
EXPECT_FALSE(Tombstone(ColumnTypeMask::DELETION_MASK, 0,
now - gc_grace_seconds + 1,
ToMicroSeconds(now - gc_grace_seconds + 1))
.Collectable(gc_grace_seconds));
}

TEST(TombstoneTest, Tombstone) {
int8_t mask = ColumnTypeMask::DELETION_MASK;
int8_t index = 2;
Expand Down
54 changes: 41 additions & 13 deletions utilities/cassandra/cassandra_functional_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,15 @@ const std::string kDbName = test::TmpDir() + "/cassandra_functional_test";
class CassandraStore {
public:
explicit CassandraStore(std::shared_ptr<DB> db)
: db_(db),
merge_option_(),
get_option_() {
: db_(db), write_option_(), get_option_() {
assert(db);
}

bool Append(const std::string& key, const RowValue& val){
std::string result;
val.Serialize(&result);
Slice valSlice(result.data(), result.size());
auto s = db_->Merge(merge_option_, key, valSlice);
auto s = db_->Merge(write_option_, key, valSlice);

if (s.ok()) {
return true;
Expand All @@ -46,6 +44,19 @@ class CassandraStore {
}
}

bool Put(const std::string& key, const RowValue& val) {
std::string result;
val.Serialize(&result);
Slice valSlice(result.data(), result.size());
auto s = db_->Put(write_option_, key, valSlice);
if (s.ok()) {
return true;
} else {
std::cerr << "ERROR " << s.ToString() << std::endl;
return false;
}
}

void Flush() {
dbfull()->TEST_FlushMemTable();
dbfull()->TEST_WaitForCompact();
Expand Down Expand Up @@ -75,21 +86,23 @@ class CassandraStore {

private:
std::shared_ptr<DB> db_;
WriteOptions merge_option_;
WriteOptions write_option_;
ReadOptions get_option_;

DBImpl* dbfull() { return reinterpret_cast<DBImpl*>(db_.get()); }

};

class TestCompactionFilterFactory : public CompactionFilterFactory {
public:
explicit TestCompactionFilterFactory(bool purge_ttl_on_expiration)
: purge_ttl_on_expiration_(purge_ttl_on_expiration) {}

virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) override {
return unique_ptr<CompactionFilter>(new CassandraCompactionFilter(purge_ttl_on_expiration_));
explicit TestCompactionFilterFactory(bool purge_ttl_on_expiration,
int32_t gc_grace_period_in_seconds)
: purge_ttl_on_expiration_(purge_ttl_on_expiration),
gc_grace_period_in_seconds_(gc_grace_period_in_seconds) {}

virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) override {
return unique_ptr<CompactionFilter>(new CassandraCompactionFilter(
purge_ttl_on_expiration_, gc_grace_period_in_seconds_));
}

virtual const char* Name() const override {
Expand All @@ -98,6 +111,7 @@ class TestCompactionFilterFactory : public CompactionFilterFactory {

private:
bool purge_ttl_on_expiration_;
int32_t gc_grace_period_in_seconds_;
};


Expand All @@ -113,7 +127,8 @@ class CassandraFunctionalTest : public testing::Test {
Options options;
options.create_if_missing = true;
options.merge_operator.reset(new CassandraValueMergeOperator(gc_grace_period_in_seconds_));
auto* cf_factory = new TestCompactionFilterFactory(purge_ttl_on_expiration_);
auto* cf_factory = new TestCompactionFilterFactory(
purge_ttl_on_expiration_, gc_grace_period_in_seconds_);
options.compaction_filter_factory.reset(cf_factory);
EXPECT_OK(DB::Open(options, kDbName, &db));
return std::shared_ptr<DB>(db);
Expand Down Expand Up @@ -275,6 +290,19 @@ TEST_F(CassandraFunctionalTest,
VerifyRowValueColumns(gced.columns_, 0, kColumn, 1, ToMicroSeconds(now));
}

TEST_F(CassandraFunctionalTest, CompactionShouldRemoveTombstoneFromPut) {
purge_ttl_on_expiration_ = true;
CassandraStore store(OpenDb());
int64_t now = time(nullptr);

store.Put("k1", CreateTestRowValue({
std::make_tuple(kTombstone, 0, ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)),
}));

store.Flush();
store.Compact();
ASSERT_FALSE(std::get<0>(store.Get("k1")));
}

} // namespace cassandra
} // namespace rocksdb
Expand Down
7 changes: 0 additions & 7 deletions utilities/cassandra/merge_operator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,6 @@ bool CassandraValueMergeOperator::FullMergeV2(
MergeOperationOutput* merge_out) const {
// Clear the *new_value for writing.
merge_out->new_value.clear();

if (merge_in.existing_value == nullptr && merge_in.operand_list.size() == 1) {
// Only one operand
merge_out->existing_operand = merge_in.operand_list.back();
return true;
}

std::vector<RowValue> row_values;
if (merge_in.existing_value) {
row_values.push_back(
Expand Down

0 comments on commit e4234fb

Please sign in to comment.