Skip to content

Commit

Permalink
pass key/value samples through zstd compression dictionary generator
Browse files Browse the repository at this point in the history
Summary:
Instead of using samples directly, we now support passing the samples through zstd's dictionary generator when `CompressionOptions::zstd_max_train_bytes` is set to nonzero. If set to zero, we will use the samples directly as the dictionary -- same as before.

Note this is the first step of facebook#2987, extracted into a separate PR per reviewer request.
Closes facebook#3057

Differential Revision: D6116891

Pulled By: ajkr

fbshipit-source-id: 70ab13cc4c734fa02e554180eed0618b75255497
  • Loading branch information
ajkr authored and facebook-github-bot committed Nov 3, 2017
1 parent c4c1f96 commit 24ad430
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 34 deletions.
16 changes: 16 additions & 0 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,22 @@ Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
" is not linked with the binary.");
}
}
if (cf_options.compression_opts.zstd_max_train_bytes > 0) {
if (!CompressionTypeSupported(CompressionType::kZSTD)) {
// Dictionary trainer is available since v0.6.1, but ZSTD was marked
// stable only since v0.8.0. For now we enable the feature in stable
// versions only.
return Status::InvalidArgument(
"zstd dictionary trainer cannot be used because " +
CompressionTypeToString(CompressionType::kZSTD) +
" is not linked with the binary.");
}
if (cf_options.compression_opts.max_dict_bytes == 0) {
return Status::InvalidArgument(
"The dictionary size limit (`CompressionOptions::max_dict_bytes`) "
"should be nonzero if we're using zstd's dictionary generator.");
}
}
return Status::OK();
}

Expand Down
37 changes: 23 additions & 14 deletions db/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -702,15 +702,18 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
sub_compact->compaction->mutable_cf_options();

// To build compression dictionary, we sample the first output file, assuming
// it'll reach the maximum length, and then use the dictionary for compressing
// subsequent output files. The dictionary may be less than max_dict_bytes if
// the first output file's length is less than the maximum.
// it'll reach the maximum length. We optionally pass these samples through
// zstd's dictionary trainer, or just use them directly. Then, the dictionary
// is used for compressing subsequent output files in the same subcompaction.
const bool kUseZstdTrainer =
cfd->ioptions()->compression_opts.zstd_max_train_bytes > 0;
const size_t kSampleBytes =
kUseZstdTrainer ? cfd->ioptions()->compression_opts.zstd_max_train_bytes
: cfd->ioptions()->compression_opts.max_dict_bytes;
const int kSampleLenShift = 6; // 2^6 = 64-byte samples
std::set<size_t> sample_begin_offsets;
if (bottommost_level_ &&
cfd->ioptions()->compression_opts.max_dict_bytes > 0) {
const size_t kMaxSamples =
cfd->ioptions()->compression_opts.max_dict_bytes >> kSampleLenShift;
if (bottommost_level_ && kSampleBytes > 0) {
const size_t kMaxSamples = kSampleBytes >> kSampleLenShift;
const size_t kOutFileLen = mutable_cf_options->MaxFileSizeForLevel(
compact_->compaction->output_level());
if (kOutFileLen != port::kMaxSizet) {
Expand Down Expand Up @@ -780,11 +783,11 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
}
const auto& c_iter_stats = c_iter->iter_stats();
auto sample_begin_offset_iter = sample_begin_offsets.cbegin();
// data_begin_offset and compression_dict are only valid while generating
// data_begin_offset and dict_sample_data are only valid while generating
// dictionary from the first output file.
size_t data_begin_offset = 0;
std::string compression_dict;
compression_dict.reserve(cfd->ioptions()->compression_opts.max_dict_bytes);
std::string dict_sample_data;
dict_sample_data.reserve(kSampleBytes);

while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) {
// Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
Expand Down Expand Up @@ -856,7 +859,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
data_elmt_copy_len =
data_end_offset - (data_begin_offset + data_elmt_copy_offset);
}
compression_dict.append(&data_elmt.data()[data_elmt_copy_offset],
dict_sample_data.append(&data_elmt.data()[data_elmt_copy_offset],
data_elmt_copy_len);
if (sample_end_offset > data_end_offset) {
// Didn't finish sample. Try to finish it with the next data_elmt.
Expand Down Expand Up @@ -911,9 +914,15 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
RecordDroppedKeys(range_del_out_stats,
&sub_compact->compaction_job_stats);
if (sub_compact->outputs.size() == 1) {
// Use dictionary from first output file for compression of subsequent
// files.
sub_compact->compression_dict = std::move(compression_dict);
// Use samples from first output file to create dictionary for
// compression of subsequent files.
if (kUseZstdTrainer) {
sub_compact->compression_dict = ZSTD_TrainDictionary(
dict_sample_data, kSampleLenShift,
cfd->ioptions()->compression_opts.max_dict_bytes);
} else {
sub_compact->compression_dict = std::move(dict_sample_data);
}
}
}
}
Expand Down
36 changes: 27 additions & 9 deletions db/db_test2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,7 @@ TEST_F(DBTest2, PresetCompressionDict) {
const size_t kL0FileBytes = 128 << 10;
const size_t kApproxPerBlockOverheadBytes = 50;
const int kNumL0Files = 5;
const int kZstdTrainFactor = 16;

Options options;
options.env = CurrentOptions().env; // Make sure to use any custom env that the test is configured with.
Expand Down Expand Up @@ -1059,17 +1060,34 @@ TEST_F(DBTest2, PresetCompressionDict) {
for (auto compression_type : compression_types) {
options.compression = compression_type;
size_t prev_out_bytes;
for (int i = 0; i < 2; ++i) {
for (int i = 0; i < 3; ++i) {
// First iteration: compress without preset dictionary
// Second iteration: compress with preset dictionary
// To make sure the compression dictionary was actually used, we verify
// the compressed size is smaller in the second iteration. Also in the
// second iteration, verify the data we get out is the same data we put
// in.
if (i) {
options.compression_opts.max_dict_bytes = kBlockSizeBytes;
} else {
options.compression_opts.max_dict_bytes = 0;
// Third iteration (zstd only): compress with zstd-trained dictionary
//
// To make sure the compression dictionary has the intended effect, we
// verify the compressed size is smaller in successive iterations. Also in
// the non-first iterations, verify the data we get out is the same data
// we put in.
switch (i) {
case 0:
options.compression_opts.max_dict_bytes = 0;
options.compression_opts.zstd_max_train_bytes = 0;
break;
case 1:
options.compression_opts.max_dict_bytes = kBlockSizeBytes;
options.compression_opts.zstd_max_train_bytes = 0;
break;
case 2:
if (compression_type != kZSTD) {
continue;
}
options.compression_opts.max_dict_bytes = kBlockSizeBytes;
options.compression_opts.zstd_max_train_bytes =
kZstdTrainFactor * kBlockSizeBytes;
break;
default:
assert(false);
}

options.statistics = rocksdb::CreateDBStatistics();
Expand Down
44 changes: 34 additions & 10 deletions include/rocksdb/advanced_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,23 +90,47 @@ struct CompressionOptions {
int window_bits;
int level;
int strategy;
// Maximum size of dictionary used to prime the compression library. Currently
// this dictionary will be constructed by sampling the first output file in a
// subcompaction when the target level is bottommost. This dictionary will be
// loaded into the compression library before compressing/uncompressing each
// data block of subsequent files in the subcompaction. Effectively, this
// improves compression ratios when there are repetitions across data blocks.
// A value of 0 indicates the feature is disabled.

// Maximum size of dictionaries used to prime the compression library.
// Enabling dictionary can improve compression ratios when there are
// repetitions across data blocks.
//
// The dictionary is created by sampling the SST file data. If
// `zstd_max_train_bytes` is nonzero, the samples are passed through zstd's
// dictionary generator. Otherwise, the random samples are used directly as
// the dictionary.
//
// When compression dictionary is disabled, we compress and write each block
// before buffering data for the next one. When compression dictionary is
// enabled, we buffer all SST file data in-memory so we can sample it, as data
// can only be compressed and written after the dictionary has been finalized.
// So users of this feature may see increased memory usage.
//
// Default: 0.
uint32_t max_dict_bytes;

// Maximum size of training data passed to zstd's dictionary trainer. Using
// zstd's dictionary trainer can achieve even better compression ratio
// improvements than using `max_dict_bytes` alone.
//
// The training data will be used to generate a dictionary of max_dict_bytes.
//
// Default: 0.
uint32_t zstd_max_train_bytes;

CompressionOptions()
: window_bits(-14), level(-1), strategy(0), max_dict_bytes(0) {}
CompressionOptions(int wbits, int _lev, int _strategy, int _max_dict_bytes)
: window_bits(-14),
level(-1),
strategy(0),
max_dict_bytes(0),
zstd_max_train_bytes(0) {}
CompressionOptions(int wbits, int _lev, int _strategy, int _max_dict_bytes,
int _zstd_max_train_bytes)
: window_bits(wbits),
level(_lev),
strategy(_strategy),
max_dict_bytes(_max_dict_bytes) {}
max_dict_bytes(_max_dict_bytes),
zstd_max_train_bytes(_zstd_max_train_bytes) {}
};

enum UpdateStatus { // Return status For inplace update callback
Expand Down
43 changes: 42 additions & 1 deletion util/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@

#if defined(ZSTD)
#include <zstd.h>
#endif
#if ZSTD_VERSION_NUMBER >= 800 // v0.8.0+
#include <zdict.h>
#endif // ZSTD_VERSION_NUMBER >= 800
#endif // ZSTD

#if defined(XPRESS)
#include "port/xpress.h"
Expand Down Expand Up @@ -796,4 +799,42 @@ inline char* ZSTD_Uncompress(const char* input_data, size_t input_length,
return nullptr;
}

inline std::string ZSTD_TrainDictionary(const std::string& samples,
const std::vector<size_t>& sample_lens,
size_t max_dict_bytes) {
// Dictionary trainer is available since v0.6.1, but ZSTD was marked stable
// only since v0.8.0. For now we enable the feature in stable versions only.
#if ZSTD_VERSION_NUMBER >= 800 // v0.8.0+
std::string dict_data(max_dict_bytes, '\0');
size_t dict_len =
ZDICT_trainFromBuffer(&dict_data[0], max_dict_bytes, &samples[0],
&sample_lens[0], sample_lens.size());
if (ZDICT_isError(dict_len)) {
return "";
}
assert(dict_len <= max_dict_bytes);
dict_data.resize(dict_len);
return dict_data;
#else // up to v0.7.x
assert(false);
return "";
#endif // ZSTD_VERSION_NUMBER >= 800
}

inline std::string ZSTD_TrainDictionary(const std::string& samples,
size_t sample_len_shift,
size_t max_dict_bytes) {
// Dictionary trainer is available since v0.6.1, but ZSTD was marked stable
// only since v0.8.0. For now we enable the feature in stable versions only.
#if ZSTD_VERSION_NUMBER >= 800 // v0.8.0+
// skips potential partial sample at the end of "samples"
size_t num_samples = samples.size() >> sample_len_shift;
std::vector<size_t> sample_lens(num_samples, 1 << sample_len_shift);
return ZSTD_TrainDictionary(samples, sample_lens, max_dict_bytes);
#else // up to v0.7.x
assert(false);
return "";
#endif // ZSTD_VERSION_NUMBER >= 800
}

} // namespace rocksdb

0 comments on commit 24ad430

Please sign in to comment.