Skip to content

Commit

Permalink
Make compression options configurable. These include window-bits, lev…
Browse files Browse the repository at this point in the history
…el and strategy for ZlibCompression

Summary: Leveldb currently uses windowBits=-14 while using zlib compression.(It was earlier 15). This makes the setting configurable. Related changes here: https://reviews.facebook.net/D6105

Test Plan: make all check

Reviewers: dhruba, MarkCallaghan, sheki, heyongqiang

Differential Revision: https://reviews.facebook.net/D6393
  • Loading branch information
emayanke committed Nov 2, 2012
1 parent 3096fa7 commit 854c66b
Show file tree
Hide file tree
Showing 9 changed files with 175 additions and 107 deletions.
7 changes: 7 additions & 0 deletions db/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,13 @@ void leveldb_options_set_compression(leveldb_options_t* opt, int t) {
opt->rep.compression = static_cast<CompressionType>(t);
}

void leveldb_options_set_compression_options(
leveldb_options_t* opt, int w_bits, int level, int strategy) {
opt->rep.compression_opts.window_bits = w_bits;
opt->rep.compression_opts.level = level;
opt->rep.compression_opts.strategy = strategy;
}

void leveldb_options_set_disable_data_sync(
leveldb_options_t* opt, bool disable_data_sync) {
opt->rep.disableDataSync = disable_data_sync;
Expand Down
1 change: 1 addition & 0 deletions db/c_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ int main(int argc, char** argv) {
leveldb_options_set_block_size(options, 1024);
leveldb_options_set_block_restart_interval(options, 8);
leveldb_options_set_compression(options, leveldb_no_compression);
leveldb_options_set_compression_options(options, -14, -1, 0);

roptions = leveldb_readoptions_create();
leveldb_readoptions_set_verify_checksums(roptions, 1);
Expand Down
25 changes: 15 additions & 10 deletions db/db_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,8 @@ static uint64_t FLAGS_delete_obsolete_files_period_micros = 0;
static enum leveldb::CompressionType FLAGS_compression_type =
leveldb::kSnappyCompression;

// Allows compression for levels 0 and 1 to be disabled when
// other levels are compressed
// Allows compression for levels 0 and 1 to be disabled when
// other levels are compressed
static int FLAGS_min_level_to_compress = -1;

static int FLAGS_table_cache_numshardbits = 4;
Expand Down Expand Up @@ -509,15 +509,18 @@ class Benchmark {

switch (FLAGS_compression_type) {
case kSnappyCompression:
result = port::Snappy_Compress(text, strlen(text), &compressed);
result = port::Snappy_Compress(Options().compression_opts, text,
strlen(text), &compressed);
name = "Snappy";
break;
case kZlibCompression:
result = port::Zlib_Compress(text, strlen(text), &compressed);
result = port::Zlib_Compress(Options().compression_opts, text,
strlen(text), &compressed);
name = "Zlib";
break;
case kBZip2Compression:
result = port::BZip2_Compress(text, strlen(text), &compressed);
result = port::BZip2_Compress(Options().compression_opts, text,
strlen(text), &compressed);
name = "BZip2";
break;
}
Expand Down Expand Up @@ -855,7 +858,8 @@ class Benchmark {
bool ok = true;
std::string compressed;
while (ok && bytes < 1024 * 1048576) { // Compress 1G
ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
ok = port::Snappy_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
produced += compressed.size();
bytes += input.size();
thread->stats.FinishedSingleOp(NULL);
Expand All @@ -876,7 +880,8 @@ class Benchmark {
RandomGenerator gen;
Slice input = gen.Generate(Options().block_size);
std::string compressed;
bool ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
bool ok = port::Snappy_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
int64_t bytes = 0;
char* uncompressed = new char[input.size()];
while (ok && bytes < 1024 * 1048576) { // Compress 1G
Expand Down Expand Up @@ -928,7 +933,7 @@ class Benchmark {
for (unsigned int i = 0; i < FLAGS_min_level_to_compress; i++) {
options.compression_per_level[i] = kNoCompression;
}
for (unsigned int i = FLAGS_min_level_to_compress;
for (unsigned int i = FLAGS_min_level_to_compress;
i < FLAGS_num_levels; i++) {
options.compression_per_level[i] = FLAGS_compression_type;
}
Expand Down Expand Up @@ -1352,8 +1357,8 @@ int main(int argc, char** argv) {
else {
fprintf(stdout, "Cannot parse %s\n", argv[i]);
}
} else if (sscanf(argv[i], "--min_level_to_compress=%d%c", &n, &junk) == 1
&& n >= 0) {
} else if (sscanf(argv[i], "--min_level_to_compress=%d%c", &n, &junk) == 1
&& n >= 0) {
FLAGS_min_level_to_compress = n;
} else if (sscanf(argv[i], "--disable_seek_compaction=%d%c", &n, &junk) == 1
&& (n == 0 || n == 1)) {
Expand Down
186 changes: 106 additions & 80 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,23 @@

namespace leveldb {

static bool SnappyCompressionSupported() {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::Snappy_Compress(in.data(), in.size(), &out);
}
static bool ZlibCompressionSupported() {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::Zlib_Compress(in.data(), in.size(), &out);
}
static bool BZip2CompressionSupported() {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::BZip2_Compress(in.data(), in.size(), &out);
}
static bool SnappyCompressionSupported(const CompressionOptions& options) {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::Snappy_Compress(options, in.data(), in.size(), &out);
}

static bool ZlibCompressionSupported(const CompressionOptions& options) {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::Zlib_Compress(options, in.data(), in.size(), &out);
}

static bool BZip2CompressionSupported(const CompressionOptions& options) {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::BZip2_Compress(options, in.data(), in.size(), &out);
}

static std::string RandomString(Random* rnd, int len) {
std::string r;
Expand Down Expand Up @@ -1076,57 +1076,60 @@ TEST(DBTest, CompactionTrigger) {
ASSERT_EQ(NumTableFilesAtLevel(1), 1);
}

void MinLevelHelper(DBTest* self, Options& options) {
Random rnd(301);

for (int num = 0;
num < options.level0_file_num_compaction_trigger - 1;
num++)
{
std::vector<std::string> values;
// Write 120KB (12 values, each 10K)
for (int i = 0; i < 12; i++) {
values.push_back(RandomString(&rnd, 10000));
ASSERT_OK(self->Put(Key(i), values[i]));
}
self->dbfull()->TEST_WaitForCompactMemTable();
ASSERT_EQ(self->NumTableFilesAtLevel(0), num + 1);
}

//generate one more file in level-0, and should trigger level-0 compaction
std::vector<std::string> values;
for (int i = 0; i < 12; i++) {
values.push_back(RandomString(&rnd, 10000));
ASSERT_OK(self->Put(Key(i), values[i]));
}
self->dbfull()->TEST_WaitForCompact();

ASSERT_EQ(self->NumTableFilesAtLevel(0), 0);
ASSERT_EQ(self->NumTableFilesAtLevel(1), 1);
}

TEST(DBTest, MinLevelToCompress) {
Options options = CurrentOptions();
options.write_buffer_size = 100<<10; //100KB
options.num_levels = 3;
options.max_mem_compaction_level = 0;
options.level0_file_num_compaction_trigger = 3;
void MinLevelHelper(DBTest* self, Options& options) {
Random rnd(301);

for (int num = 0;
num < options.level0_file_num_compaction_trigger - 1;
num++)
{
std::vector<std::string> values;
// Write 120KB (12 values, each 10K)
for (int i = 0; i < 12; i++) {
values.push_back(RandomString(&rnd, 10000));
ASSERT_OK(self->Put(Key(i), values[i]));
}
self->dbfull()->TEST_WaitForCompactMemTable();
ASSERT_EQ(self->NumTableFilesAtLevel(0), num + 1);
}

//generate one more file in level-0, and should trigger level-0 compaction
std::vector<std::string> values;
for (int i = 0; i < 12; i++) {
values.push_back(RandomString(&rnd, 10000));
ASSERT_OK(self->Put(Key(i), values[i]));
}
self->dbfull()->TEST_WaitForCompact();

ASSERT_EQ(self->NumTableFilesAtLevel(0), 0);
ASSERT_EQ(self->NumTableFilesAtLevel(1), 1);
}

void MinLevelToCompress(CompressionType& type, Options& options, int wbits,
int lev, int strategy) {
fprintf(stderr, "Test with compression options : window_bits = %d, level = %d
, strategy = %d}\n", wbits, lev, strategy);
options.write_buffer_size = 100<<10; //100KB
options.num_levels = 3;
options.max_mem_compaction_level = 0;
options.level0_file_num_compaction_trigger = 3;
options.create_if_missing = true;
CompressionType type;

if (SnappyCompressionSupported()) {
type = kSnappyCompression;
fprintf(stderr, "using snappy\n");
} else if (ZlibCompressionSupported()) {
type = kZlibCompression;
fprintf(stderr, "using zlib\n");
} else if (BZip2CompressionSupported()) {
type = kBZip2Compression;
fprintf(stderr, "using bzip2\n");
} else {
fprintf(stderr, "skipping test, compression disabled\n");
return;
}

if (SnappyCompressionSupported(CompressionOptions(wbits, lev, strategy))) {
type = kSnappyCompression;
fprintf(stderr, "using snappy\n");
} else if (ZlibCompressionSupported(
CompressionOptions(wbits, lev, strategy))) {
type = kZlibCompression;
fprintf(stderr, "using zlib\n");
} else if (BZip2CompressionSupported(
CompressionOptions(wbits, lev, strategy))) {
type = kBZip2Compression;
fprintf(stderr, "using bzip2\n");
} else {
fprintf(stderr, "skipping test, compression disabled\n");
return;
}
options.compression_per_level = new CompressionType[options.num_levels];

// do not compress L0
Expand All @@ -1136,19 +1139,42 @@ TEST(DBTest, MinLevelToCompress) {
for (int i = 1; i < options.num_levels; i++) {
options.compression_per_level[i] = type;
}
Reopen(&options);
MinLevelHelper(this, options);

}
TEST(DBTest, MinLevelToCompress1) {
Options options = CurrentOptions();
CompressionType type;
MinLevelToCompress(type, options, -14, -1, 0);
Reopen(&options);
MinLevelHelper(this, options);

// do not compress L0 and L1
for (int i = 0; i < 2; i++) {
options.compression_per_level[i] = kNoCompression;
}
for (int i = 2; i < options.num_levels; i++) {
options.compression_per_level[i] = type;
}
DestroyAndReopen(&options);
MinLevelHelper(this, options);
}

TEST(DBTest, MinLevelToCompress2) {
Options options = CurrentOptions();
CompressionType type;
MinLevelToCompress(type, options, 15, -1, 0);
Reopen(&options);
MinLevelHelper(this, options);

// do not compress L0 and L1
for (int i = 0; i < 2; i++) {
options.compression_per_level[i] = kNoCompression;
}
for (int i = 2; i < options.num_levels; i++) {
options.compression_per_level[i] = type;
}
DestroyAndReopen(&options);
MinLevelHelper(this, options);
}
DestroyAndReopen(&options);
MinLevelHelper(this, options);
}

TEST(DBTest, RepeatedWritesToSameKey) {
Options options = CurrentOptions();
Expand Down Expand Up @@ -1851,7 +1877,7 @@ TEST(DBTest, SnapshotFiles) {
uint64_t size;
ASSERT_OK(env_->GetFileSize(src, &size));

// record the number and the size of the
// record the number and the size of the
// latest manifest file
if (ParseFileName(files[i].substr(1), &number, &type)) {
if (type == kDescriptorFile) {
Expand All @@ -1866,7 +1892,7 @@ TEST(DBTest, SnapshotFiles) {
ASSERT_OK(env_->NewSequentialFile(src, &srcfile));
WritableFile* destfile;
ASSERT_OK(env_->NewWritableFile(dest, &destfile));

char buffer[4096];
Slice slice;
while (size > 0) {
Expand All @@ -1889,7 +1915,7 @@ TEST(DBTest, SnapshotFiles) {
extras.push_back(RandomString(&rnd, 100000));
ASSERT_OK(Put(Key(i), extras[i]));
}

// verify that data in the snapshot are correct
Options opts;
DB* snapdb;
Expand All @@ -1905,7 +1931,7 @@ TEST(DBTest, SnapshotFiles) {
}
delete snapdb;

// look at the new live files after we added an 'extra' key
// look at the new live files after we added an 'extra' key
// and after we took the first snapshot.
uint64_t new_manifest_number = 0;
uint64_t new_manifest_size = 0;
Expand All @@ -1919,7 +1945,7 @@ TEST(DBTest, SnapshotFiles) {
// previous shapshot.
for (unsigned int i = 0; i < newfiles.size(); i++) {
std::string src = dbname_ + "/" + newfiles[i];
// record the lognumber and the size of the
// record the lognumber and the size of the
// latest manifest file
if (ParseFileName(newfiles[i].substr(1), &number, &type)) {
if (type == kDescriptorFile) {
Expand All @@ -1934,7 +1960,7 @@ TEST(DBTest, SnapshotFiles) {
}
ASSERT_EQ(manifest_number, new_manifest_number);
ASSERT_GT(new_manifest_size, manifest_size);

// release file snapshot
dbfull()->DisableFileDeletions();
}
Expand Down Expand Up @@ -1998,7 +2024,7 @@ TEST(DBTest, ReadCompaction) {
// in some level, indicating that there was a compaction
ASSERT_TRUE(NumTableFilesAtLevel(0) < l1 ||
NumTableFilesAtLevel(1) < l2 ||
NumTableFilesAtLevel(2) < l3);
NumTableFilesAtLevel(2) < l3);
delete options.block_cache;
}
}
Expand Down
18 changes: 17 additions & 1 deletion include/leveldb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,19 @@ enum CompressionType {
kBZip2Compression = 0x3
};

// Compression options for different compression algorithms like Zlib
struct CompressionOptions {
int window_bits;
int level;
int strategy;
CompressionOptions():window_bits(-14),
level(-1),
strategy(0){}
CompressionOptions(int wbits, int lev, int strategy):window_bits(wbits),
level(lev),
strategy(strategy){}
};

// Options to control the behavior of a database (passed to DB::Open)
struct Options {
// -------------------
Expand Down Expand Up @@ -144,10 +157,13 @@ struct Options {
// reponsible for allocating memory and initializing the values in it
// before invoking Open(). The caller is responsible for freeing this
// array and it could be freed anytime after the return from Open().
// This could have been a std::vector but that makes the equivalent
// This could have been a std::vector but that makes the equivalent
// java/C api hard to construct.
CompressionType* compression_per_level;

//different options for compression algorithms
CompressionOptions compression_opts;

// If non-NULL, use the specified filter policy to reduce disk reads.
// Many applications will benefit from passing the result of
// NewBloomFilterPolicy() here.
Expand Down
Loading

0 comments on commit 854c66b

Please sign in to comment.