Skip to content

Commit

Permalink
Recompress blobs during GC if compression changed (facebook#7331)
Browse files Browse the repository at this point in the history
Summary:
Recompress blobs if compression type is changed.

Pull Request resolved: facebook#7331

Test Plan: `make check`

Reviewed By: ltamasi

Differential Revision: D23437102

Pulled By: jay-zhuang

fbshipit-source-id: bb699ebdad137721d422e42e331d4de8a82a7c5f
  • Loading branch information
jay-zhuang authored and facebook-github-bot committed Sep 2, 2020
1 parent 792d2f9 commit 55bf42a
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 1 deletion.
19 changes: 19 additions & 0 deletions utilities/blob_db/blob_compaction_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -360,11 +360,30 @@ CompactionFilter::BlobDecision BlobIndexCompactionFilterGC::PrepareBlobOutput(

PinnableSlice blob;
CompressionType compression_type = kNoCompression;
std::string compression_output;
if (!ReadBlobFromOldFile(key, blob_index, &blob, false, &compression_type)) {
gc_stats_.SetError();
return BlobDecision::kIOError;
}

// If the compression_type is changed, re-compress it with the new compression
// type.
if (compression_type != blob_db_impl->bdb_options_.compression) {
if (compression_type != kNoCompression) {
const Status status =
blob_db_impl->DecompressSlice(blob, compression_type, &blob);
if (!status.ok()) {
gc_stats_.SetError();
return BlobDecision::kCorruption;
}
}
if (blob_db_impl->bdb_options_.compression != kNoCompression) {
blob_db_impl->GetCompressedSlice(blob, &compression_output);
blob = PinnableSlice(&compression_output);
blob.PinSelf();
}
}

uint64_t new_blob_file_number = 0;
uint64_t new_blob_offset = 0;
if (!WriteBlobToNewFile(key, blob, &new_blob_file_number, &new_blob_offset)) {
Expand Down
155 changes: 154 additions & 1 deletion utilities/blob_db/blob_db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,160 @@ TEST_F(BlobDBTest, DecompressAfterReopen) {
Reopen(bdb_options);
VerifyDB(data);
}
#endif

TEST_F(BlobDBTest, EnableDisableCompressionGC) {
Random rnd(301);
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.enable_garbage_collection = true;
bdb_options.garbage_collection_cutoff = 1.0;
bdb_options.disable_background_tasks = true;
bdb_options.compression = kSnappyCompression;
Open(bdb_options);
std::map<std::string, std::string> data;
size_t data_idx = 0;
for (; data_idx < 100; data_idx++) {
PutRandom("put-key" + ToString(data_idx), &rnd, &data);
}
VerifyDB(data);
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_EQ(kSnappyCompression, blob_files[0]->GetCompressionType());

// disable compression
bdb_options.compression = kNoCompression;
Reopen(bdb_options);

// Add more data with new compression type
for (; data_idx < 200; data_idx++) {
PutRandom("put-key" + ToString(data_idx), &rnd, &data);
}
VerifyDB(data);

blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(2, blob_files.size());
ASSERT_EQ(kNoCompression, blob_files[1]->GetCompressionType());

// Trigger compaction
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
blob_db_impl()->TEST_DeleteObsoleteFiles();
VerifyDB(data);

blob_files = blob_db_impl()->TEST_GetBlobFiles();
for (auto bfile : blob_files) {
ASSERT_EQ(kNoCompression, bfile->GetCompressionType());
}

// enabling the compression again
bdb_options.compression = kSnappyCompression;
Reopen(bdb_options);

// Add more data with new compression type
for (; data_idx < 300; data_idx++) {
PutRandom("put-key" + ToString(data_idx), &rnd, &data);
}
VerifyDB(data);

// Trigger compaction
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
blob_db_impl()->TEST_DeleteObsoleteFiles();
VerifyDB(data);

blob_files = blob_db_impl()->TEST_GetBlobFiles();
for (auto bfile : blob_files) {
ASSERT_EQ(kSnappyCompression, bfile->GetCompressionType());
}
}

#ifdef LZ4
// Test switch compression types and run GC, it needs both Snappy and LZ4
// support.
TEST_F(BlobDBTest, ChangeCompressionGC) {
Random rnd(301);
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.enable_garbage_collection = true;
bdb_options.garbage_collection_cutoff = 1.0;
bdb_options.disable_background_tasks = true;
bdb_options.compression = kLZ4Compression;
Open(bdb_options);
std::map<std::string, std::string> data;
size_t data_idx = 0;
for (; data_idx < 100; data_idx++) {
PutRandom("put-key" + ToString(data_idx), &rnd, &data);
}
VerifyDB(data);
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_EQ(kLZ4Compression, blob_files[0]->GetCompressionType());

// Change compression type
bdb_options.compression = kSnappyCompression;
Reopen(bdb_options);

// Add more data with Snappy compression type
for (; data_idx < 200; data_idx++) {
PutRandom("put-key" + ToString(data_idx), &rnd, &data);
}
VerifyDB(data);

// Verify blob file compression type
blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(2, blob_files.size());
ASSERT_EQ(kSnappyCompression, blob_files[1]->GetCompressionType());

ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
VerifyDB(data);

blob_db_impl()->TEST_DeleteObsoleteFiles();
blob_files = blob_db_impl()->TEST_GetBlobFiles();
for (auto bfile : blob_files) {
ASSERT_EQ(kSnappyCompression, bfile->GetCompressionType());
}

// Disable compression
bdb_options.compression = kNoCompression;
Reopen(bdb_options);
for (; data_idx < 300; data_idx++) {
PutRandom("put-key" + ToString(data_idx), &rnd, &data);
}
VerifyDB(data);

ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
VerifyDB(data);

blob_db_impl()->TEST_DeleteObsoleteFiles();
blob_files = blob_db_impl()->TEST_GetBlobFiles();
for (auto bfile : blob_files) {
ASSERT_EQ(kNoCompression, bfile->GetCompressionType());
}

// switching different compression types to generate mixed compression types
bdb_options.compression = kSnappyCompression;
Reopen(bdb_options);
for (; data_idx < 400; data_idx++) {
PutRandom("put-key" + ToString(data_idx), &rnd, &data);
}
VerifyDB(data);

bdb_options.compression = kLZ4Compression;
Reopen(bdb_options);
for (; data_idx < 500; data_idx++) {
PutRandom("put-key" + ToString(data_idx), &rnd, &data);
}
VerifyDB(data);

ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
VerifyDB(data);

blob_db_impl()->TEST_DeleteObsoleteFiles();
blob_files = blob_db_impl()->TEST_GetBlobFiles();
for (auto bfile : blob_files) {
ASSERT_EQ(kLZ4Compression, bfile->GetCompressionType());
}
}
#endif // LZ4
#endif // SNAPPY

TEST_F(BlobDBTest, MultipleWriters) {
Open(BlobDBOptions());
Expand Down

0 comments on commit 55bf42a

Please sign in to comment.