Skip to content

Commit

Permalink
Support for LZ4 compression.
Browse files Browse the repository at this point in the history
  • Loading branch information
alberts committed Feb 8, 2014
1 parent 4159a28 commit df2f922
Show file tree
Hide file tree
Showing 11 changed files with 345 additions and 42 deletions.
14 changes: 13 additions & 1 deletion build_tools/build_detect_platform
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
#
# -DLEVELDB_PLATFORM_POSIX if cstdatomic is present
# -DLEVELDB_PLATFORM_NOATOMIC if it is not
# -DSNAPPY if the Snappy library is present
# -DSNAPPY if the Snappy library is present
# -DLZ4 if the LZ4 library is present
#
# Using gflags in rocksdb:
# Our project depends on gflags, which requires users to take some extra steps
Expand Down Expand Up @@ -244,6 +245,17 @@ EOF
PLATFORM_LDFLAGS="$PLATFORM_LDFLAGS -lbz2"
fi

# Test whether lz4 library is installed
$CXX $CFLAGS $COMMON_FLAGS -x c++ - -o /dev/null 2>/dev/null <<EOF
#include <lz4.h>
#include <lz4hc.h>
int main() {}
EOF
if [ "$?" = 0 ]; then
COMMON_FLAGS="$COMMON_FLAGS -DLZ4"
PLATFORM_LDFLAGS="$PLATFORM_LDFLAGS -llz4"
fi

# Test whether tcmalloc is available
$CXX $CFLAGS -x c++ - -o /dev/null -ltcmalloc 2>/dev/null <<EOF
int main() {}
Expand Down
138 changes: 118 additions & 20 deletions db/db_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ DEFINE_string(benchmarks,
"randomwithverify,"
"fill100K,"
"crc32c,"
"snappycomp,"
"snappyuncomp,"
"compress,"
"uncompress,"
"acquireload,"
"fillfromstdin,",

Expand Down Expand Up @@ -338,6 +338,10 @@ enum rocksdb::CompressionType StringToCompressionType(const char* ctype) {
return rocksdb::kZlibCompression;
else if (!strcasecmp(ctype, "bzip2"))
return rocksdb::kBZip2Compression;
else if (!strcasecmp(ctype, "lz4"))
return rocksdb::kLZ4Compression;
else if (!strcasecmp(ctype, "lz4hc"))
return rocksdb::kLZ4HCCompression;

fprintf(stdout, "Cannot parse compression type '%s'\n", ctype);
return rocksdb::kSnappyCompression; //default value
Expand Down Expand Up @@ -841,7 +845,13 @@ class Benchmark {
case rocksdb::kBZip2Compression:
fprintf(stdout, "Compression: bzip2\n");
break;
}
case rocksdb::kLZ4Compression:
fprintf(stdout, "Compression: lz4\n");
break;
case rocksdb::kLZ4HCCompression:
fprintf(stdout, "Compression: lz4hc\n");
break;
}

switch (FLAGS_rep_factory) {
case kPrefixHash:
Expand Down Expand Up @@ -896,6 +906,16 @@ class Benchmark {
strlen(text), &compressed);
name = "BZip2";
break;
case kLZ4Compression:
result = port::LZ4_Compress(Options().compression_opts, text,
strlen(text), &compressed);
name = "LZ4";
break;
case kLZ4HCCompression:
result = port::LZ4HC_Compress(Options().compression_opts, text,
strlen(text), &compressed);
name = "LZ4HC";
break;
case kNoCompression:
assert(false); // cannot happen
break;
Expand Down Expand Up @@ -1146,10 +1166,10 @@ class Benchmark {
method = &Benchmark::Crc32c;
} else if (name == Slice("acquireload")) {
method = &Benchmark::AcquireLoad;
} else if (name == Slice("snappycomp")) {
method = &Benchmark::SnappyCompress;
} else if (name == Slice("snappyuncomp")) {
method = &Benchmark::SnappyUncompress;
} else if (name == Slice("compress")) {
method = &Benchmark::Compress;
} else if (name == Slice("uncompress")) {
method = &Benchmark::Uncompress;
} else if (name == Slice("heapprofile")) {
HeapProfile();
} else if (name == Slice("stats")) {
Expand Down Expand Up @@ -1302,23 +1322,47 @@ class Benchmark {
if (ptr == nullptr) exit(1); // Disable unused variable warning.
}

void SnappyCompress(ThreadState* thread) {
void Compress(ThreadState *thread) {
RandomGenerator gen;
Slice input = gen.Generate(Options().block_size);
int64_t bytes = 0;
int64_t produced = 0;
bool ok = true;
std::string compressed;
while (ok && bytes < 1024 * 1048576) { // Compress 1G
ok = port::Snappy_Compress(Options().compression_opts, input.data(),

// Compress 1G
while (ok && bytes < int64_t(1) << 30) {
switch (FLAGS_compression_type_e) {
case rocksdb::kSnappyCompression:
ok = port::Snappy_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
break;
case rocksdb::kZlibCompression:
ok = port::Zlib_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
break;
case rocksdb::kBZip2Compression:
ok = port::BZip2_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
break;
case rocksdb::kLZ4Compression:
ok = port::LZ4_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
break;
case rocksdb::kLZ4HCCompression:
ok = port::LZ4HC_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
break;
default:
ok = false;
}
produced += compressed.size();
bytes += input.size();
thread->stats.FinishedSingleOp(nullptr);
}

if (!ok) {
thread->stats.AddMessage("(snappy failure)");
thread->stats.AddMessage("(compression failure)");
} else {
char buf[100];
snprintf(buf, sizeof(buf), "(output: %.1f%%)",
Expand All @@ -1328,24 +1372,78 @@ class Benchmark {
}
}

void SnappyUncompress(ThreadState* thread) {
void Uncompress(ThreadState *thread) {
RandomGenerator gen;
Slice input = gen.Generate(Options().block_size);
std::string compressed;
bool ok = port::Snappy_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);

bool ok;
switch (FLAGS_compression_type_e) {
case rocksdb::kSnappyCompression:
ok = port::Snappy_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
break;
case rocksdb::kZlibCompression:
ok = port::Zlib_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
break;
case rocksdb::kBZip2Compression:
ok = port::BZip2_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
break;
case rocksdb::kLZ4Compression:
ok = port::LZ4_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
break;
case rocksdb::kLZ4HCCompression:
ok = port::LZ4HC_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
break;
default:
ok = false;
}

int64_t bytes = 0;
char* uncompressed = new char[input.size()];
while (ok && bytes < 1024 * 1048576) { // Compress 1G
ok = port::Snappy_Uncompress(compressed.data(), compressed.size(),
uncompressed);
int decompress_size;
while (ok && bytes < 1024 * 1048576) {
char *uncompressed = nullptr;
switch (FLAGS_compression_type_e) {
case rocksdb::kSnappyCompression:
// allocate here to make comparison fair
uncompressed = new char[input.size()];
ok = port::Snappy_Uncompress(compressed.data(), compressed.size(),
uncompressed);
break;
case rocksdb::kZlibCompression:
uncompressed = port::Zlib_Uncompress(
compressed.data(), compressed.size(), &decompress_size);
ok = uncompressed != nullptr;
break;
case rocksdb::kBZip2Compression:
uncompressed = port::BZip2_Uncompress(
compressed.data(), compressed.size(), &decompress_size);
ok = uncompressed != nullptr;
break;
case rocksdb::kLZ4Compression:
uncompressed = port::LZ4_Uncompress(
compressed.data(), compressed.size(), &decompress_size);
ok = uncompressed != nullptr;
break;
case rocksdb::kLZ4HCCompression:
uncompressed = port::LZ4_Uncompress(
compressed.data(), compressed.size(), &decompress_size);
ok = uncompressed != nullptr;
break;
default:
ok = false;
}
delete[] uncompressed;
bytes += input.size();
thread->stats.FinishedSingleOp(nullptr);
}
delete[] uncompressed;

if (!ok) {
thread->stats.AddMessage("(snappy failure)");
thread->stats.AddMessage("(compression failure)");
} else {
thread->stats.AddBytes(bytes);
}
Expand Down
22 changes: 21 additions & 1 deletion db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,19 @@ static bool BZip2CompressionSupported(const CompressionOptions& options) {
return port::BZip2_Compress(options, in.data(), in.size(), &out);
}

static std::string RandomString(Random* rnd, int len) {
static bool LZ4CompressionSupported(const CompressionOptions &options) {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::LZ4_Compress(options, in.data(), in.size(), &out);
}

static bool LZ4HCCompressionSupported(const CompressionOptions &options) {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::LZ4HC_Compress(options, in.data(), in.size(), &out);
}

static std::string RandomString(Random *rnd, int len) {
std::string r;
test::RandomString(rnd, len, &r);
return r;
Expand Down Expand Up @@ -2624,6 +2636,14 @@ bool MinLevelToCompress(CompressionType& type, Options& options, int wbits,
CompressionOptions(wbits, lev, strategy))) {
type = kBZip2Compression;
fprintf(stderr, "using bzip2\n");
} else if (LZ4CompressionSupported(
CompressionOptions(wbits, lev, strategy))) {
type = kLZ4Compression;
fprintf(stderr, "using lz4\n");
} else if (LZ4HCCompressionSupported(
CompressionOptions(wbits, lev, strategy))) {
type = kLZ4HCCompression;
fprintf(stderr, "using lz4hc\n");
} else {
fprintf(stderr, "skipping test, compression disabled\n");
return false;
Expand Down
6 changes: 4 additions & 2 deletions include/rocksdb/c.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,10 @@ extern void rocksdb_options_set_memtable_vector_rep(rocksdb_options_t*);
enum {
rocksdb_no_compression = 0,
rocksdb_snappy_compression = 1,
rocksdb_zlib_compression = 1,
rocksdb_bz2_compression = 1
rocksdb_zlib_compression = 2,
rocksdb_bz2_compression = 3,
rocksdb_lz4_compression = 4,
rocksdb_lz4hc_compression = 5
};
extern void rocksdb_options_set_compression(rocksdb_options_t*, int);

Expand Down
6 changes: 2 additions & 4 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,8 @@ using std::shared_ptr;
enum CompressionType : char {
// NOTE: do not change the values of existing entries, as these are
// part of the persistent format on disk.
kNoCompression = 0x0,
kSnappyCompression = 0x1,
kZlibCompression = 0x2,
kBZip2Compression = 0x3
kNoCompression = 0x0, kSnappyCompression = 0x1, kZlibCompression = 0x2,
kBZip2Compression = 0x3, kLZ4Compression = 0x4, kLZ4HCCompression = 0x5
};

enum CompactionStyle : char {
Expand Down
68 changes: 65 additions & 3 deletions port/port_posix.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@
#include <bzlib.h>
#endif

#if defined(LZ4)
#include <lz4.h>
#include <lz4hc.h>
#endif

#include <stdint.h>
#include <string>
#include <string.h>
Expand Down Expand Up @@ -353,8 +358,8 @@ inline bool BZip2_Compress(const CompressionOptions& opts, const char* input,
return false;
}

inline char* BZip2_Uncompress(const char* input_data, size_t input_length,
int* decompress_size) {
inline char* BZip2_Uncompress(const char* input_data, size_t input_length,
int* decompress_size) {
#ifdef BZIP2
bz_stream _stream;
memset(&_stream, 0, sizeof(bz_stream));
Expand Down Expand Up @@ -409,7 +414,64 @@ inline char* BZip2_Uncompress(const char* input_data, size_t input_length,
return nullptr;
}

inline bool GetHeapProfile(void (*func)(void*, const char*, int), void* arg) {
inline bool LZ4_Compress(const CompressionOptions &opts, const char *input,
size_t length, ::std::string* output) {
#ifdef LZ4
int compressBound = LZ4_compressBound(length);
output->resize(8 + compressBound);
char *p = const_cast<char *>(output->c_str());
memcpy(p, &length, sizeof(length));
size_t outlen;
outlen = LZ4_compress_limitedOutput(input, p + 8, length, compressBound);
if (outlen == 0) {
return false;
}
output->resize(8 + outlen);
return true;
#endif
return false;
}

inline char* LZ4_Uncompress(const char* input_data, size_t input_length,
int* decompress_size) {
#ifdef LZ4
if (input_length < 8) {
return nullptr;
}
int output_len;
memcpy(&output_len, input_data, sizeof(output_len));
char *output = new char[output_len];
*decompress_size = LZ4_decompress_safe_partial(
input_data + 8, output, input_length - 8, output_len, output_len);
if (*decompress_size < 0) {
delete[] output;
return nullptr;
}
return output;
#endif
return nullptr;
}

inline bool LZ4HC_Compress(const CompressionOptions &opts, const char* input,
size_t length, ::std::string* output) {
#ifdef LZ4
int compressBound = LZ4_compressBound(length);
output->resize(8 + compressBound);
char *p = const_cast<char *>(output->c_str());
memcpy(p, &length, sizeof(length));
size_t outlen;
outlen = LZ4_compressHC2_limitedOutput(input, p + 8, length, compressBound,
opts.level);
if (outlen == 0) {
return false;
}
output->resize(8 + outlen);
return true;
#endif
return false;
}

inline bool GetHeapProfile(void (*func)(void *, const char *, int), void *arg) {
return false;
}

Expand Down
Loading

0 comments on commit df2f922

Please sign in to comment.