Skip to content

Commit

Permalink
New Statistics to track Compression/Decompression (facebook#1197)
Browse files Browse the repository at this point in the history
* Added new statistics and refactored to allow ioptions to be passed around as required to access environment and statistics pointers (and, as a convenient side effect, info_log pointer).

* Prevent incrementing compression counter when compression is turned off in options.

* Prevent incrementing compression counter when compression is turned off in options.

* Added two more supported compression types to test code in db_test.cc

* Prevent incrementing compression counter when compression is turned off in options.

* Added new StatsLevel that excludes compression timing.

* Fixed casting error in coding.h

* Fixed CompressionStatsTest for new StatsLevel.

* Removed unused variable that was breaking the Linux build
  • Loading branch information
jalexanderqed authored and siying committed Jul 19, 2016
1 parent 515b11f commit 9430333
Show file tree
Hide file tree
Showing 19 changed files with 255 additions and 114 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,5 @@ java/javadoc
scan_build_report/
t
LOG

db_logs/
75 changes: 75 additions & 0 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1189,6 +1189,12 @@ bool MinLevelToCompress(CompressionType& type, Options& options, int wbits,
} else if (LZ4_Supported()) {
type = kLZ4Compression;
fprintf(stderr, "using lz4\n");
} else if (XPRESS_Supported()) {
type = kXpressCompression;
fprintf(stderr, "using xpress\n");
} else if (ZSTD_Supported()) {
type = kZSTDNotFinalCompression;
fprintf(stderr, "using ZSTD\n");
} else {
fprintf(stderr, "skipping test, compression disabled\n");
return false;
Expand Down Expand Up @@ -4685,6 +4691,75 @@ TEST_F(DBTest, EncodeDecompressedBlockSizeTest) {
}
}

TEST_F(DBTest, CompressionStatsTest) {
CompressionType type;

if (Snappy_Supported()) {
type = kSnappyCompression;
fprintf(stderr, "using snappy\n");
} else if (Zlib_Supported()) {
type = kZlibCompression;
fprintf(stderr, "using zlib\n");
} else if (BZip2_Supported()) {
type = kBZip2Compression;
fprintf(stderr, "using bzip2\n");
} else if (LZ4_Supported()) {
type = kLZ4Compression;
fprintf(stderr, "using lz4\n");
} else if (XPRESS_Supported()) {
type = kXpressCompression;
fprintf(stderr, "using xpress\n");
} else if (ZSTD_Supported()) {
type = kZSTDNotFinalCompression;
fprintf(stderr, "using ZSTD\n");
} else {
fprintf(stderr, "skipping test, compression disabled\n");
return;
}

Options options = CurrentOptions();
options.compression = type;
options.statistics = rocksdb::CreateDBStatistics();
options.statistics->stats_level_ = StatsLevel::kAll;
DestroyAndReopen(options);

int kNumKeysWritten = 100000;

// Check that compressions occur and are counted when compression is turned on
Random rnd(301);
for (int i = 0; i < kNumKeysWritten; ++i) {
// compressible string
ASSERT_OK(Put(Key(i), RandomString(&rnd, 128) + std::string(128, 'a')));
}
ASSERT_GT(options.statistics->getTickerCount(NUMBER_BLOCK_COMPRESSED), 0);

for (int i = 0; i < kNumKeysWritten; ++i) {
auto r = Get(Key(i));
}
ASSERT_GT(options.statistics->getTickerCount(NUMBER_BLOCK_DECOMPRESSED), 0);

options.compression = kNoCompression;
DestroyAndReopen(options);
uint64_t currentCompressions =
options.statistics->getTickerCount(NUMBER_BLOCK_COMPRESSED);
uint64_t currentDecompressions =
options.statistics->getTickerCount(NUMBER_BLOCK_DECOMPRESSED);

// Check that compressions do not occur when turned off
for (int i = 0; i < kNumKeysWritten; ++i) {
// compressible string
ASSERT_OK(Put(Key(i), RandomString(&rnd, 128) + std::string(128, 'a')));
}
ASSERT_EQ(options.statistics->getTickerCount(NUMBER_BLOCK_COMPRESSED)
- currentCompressions, 0);

for (int i = 0; i < kNumKeysWritten; ++i) {
auto r = Get(Key(i));
}
ASSERT_EQ(options.statistics->getTickerCount(NUMBER_BLOCK_DECOMPRESSED)
- currentDecompressions, 0);
}

TEST_F(DBTest, MutexWaitStatsDisabledByDefault) {
Options options = CurrentOptions();
options.create_if_missing = true;
Expand Down
7 changes: 3 additions & 4 deletions db/plain_table_db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -331,20 +331,19 @@ class TestPlainTableFactory : public PlainTableFactory {
TableProperties* props = nullptr;
auto s =
ReadTableProperties(file.get(), file_size, kPlainTableMagicNumber,
table_reader_options.ioptions.env,
table_reader_options.ioptions.info_log, &props);
table_reader_options.ioptions, &props);
EXPECT_TRUE(s.ok());

if (store_index_in_file_) {
BlockHandle bloom_block_handle;
s = FindMetaBlock(file.get(), file_size, kPlainTableMagicNumber,
table_reader_options.ioptions.env,
table_reader_options.ioptions,
BloomBlockBuilder::kBloomBlock, &bloom_block_handle);
EXPECT_TRUE(s.ok());

BlockHandle index_block_handle;
s = FindMetaBlock(file.get(), file_size, kPlainTableMagicNumber,
table_reader_options.ioptions.env,
table_reader_options.ioptions,
PlainTableIndexBuilder::kPlainTableIndexBlock,
&index_block_handle);
EXPECT_TRUE(s.ok());
Expand Down
4 changes: 2 additions & 2 deletions db/table_properties_collector_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ void TestCustomizedTablePropertiesCollector(
new test::StringSource(fwf->contents())));
TableProperties* props;
Status s = ReadTableProperties(fake_file_reader.get(), fwf->contents().size(),
magic_number, Env::Default(), nullptr, &props);
magic_number, ioptions, &props);
std::unique_ptr<TableProperties> props_guard(props);
ASSERT_OK(s);

Expand Down Expand Up @@ -417,7 +417,7 @@ void TestInternalKeyPropertiesCollector(
TableProperties* props;
Status s =
ReadTableProperties(reader.get(), fwf->contents().size(), magic_number,
Env::Default(), nullptr, &props);
ioptions, &props);
ASSERT_OK(s);

std::unique_ptr<TableProperties> props_guard(props);
Expand Down
3 changes: 1 addition & 2 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -597,8 +597,7 @@ Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
new RandomAccessFileReader(std::move(file)));
s = ReadTableProperties(
file_reader.get(), file_meta->fd.GetFileSize(),
Footer::kInvalidTableMagicNumber /* table's magic number */, vset_->env_,
ioptions->info_log, &raw_table_properties);
Footer::kInvalidTableMagicNumber /* table's magic number */, *ioptions, &raw_table_properties);
if (!s.ok()) {
return s;
}
Expand Down
22 changes: 22 additions & 0 deletions include/rocksdb/statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ enum Tickers : uint32_t {
NUMBER_SUPERVERSION_ACQUIRES,
NUMBER_SUPERVERSION_RELEASES,
NUMBER_SUPERVERSION_CLEANUPS,

// # of compressions/decompressions executed
NUMBER_BLOCK_COMPRESSED,
NUMBER_BLOCK_DECOMPRESSED,

NUMBER_BLOCK_NOT_COMPRESSED,
MERGE_OPERATION_TOTAL_TIME,
FILTER_OPERATION_TOTAL_TIME,
Expand Down Expand Up @@ -269,6 +274,8 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{NUMBER_SUPERVERSION_ACQUIRES, "rocksdb.number.superversion_acquires"},
{NUMBER_SUPERVERSION_RELEASES, "rocksdb.number.superversion_releases"},
{NUMBER_SUPERVERSION_CLEANUPS, "rocksdb.number.superversion_cleanups"},
{NUMBER_BLOCK_COMPRESSED, "rocksdb.number.block.compressed"},
{NUMBER_BLOCK_DECOMPRESSED, "rocksdb.number.block.decompressed"},
{NUMBER_BLOCK_NOT_COMPRESSED, "rocksdb.number.block.not_compressed"},
{MERGE_OPERATION_TOTAL_TIME, "rocksdb.merge.operation.time.nanos"},
{FILTER_OPERATION_TOTAL_TIME, "rocksdb.filter.operation.time.nanos"},
Expand Down Expand Up @@ -313,6 +320,14 @@ enum Histograms : uint32_t {
BYTES_PER_READ,
BYTES_PER_WRITE,
BYTES_PER_MULTIGET,

// number of bytes compressed/decompressed
// number of bytes is when uncompressed; i.e. before/after respectively
BYTES_COMPRESSED,
BYTES_DECOMPRESSED,
COMPRESSION_TIMES_NANOS,
DECOMPRESSION_TIMES_NANOS,

HISTOGRAM_ENUM_MAX, // TODO(ldemailly): enforce HistogramsNameMap match
};

Expand Down Expand Up @@ -343,6 +358,10 @@ const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = {
{BYTES_PER_READ, "rocksdb.bytes.per.read"},
{BYTES_PER_WRITE, "rocksdb.bytes.per.write"},
{BYTES_PER_MULTIGET, "rocksdb.bytes.per.multiget"},
{BYTES_COMPRESSED, "rocksdb.bytes.compressed"},
{BYTES_DECOMPRESSED, "rocksdb.bytes.decompressed"},
{COMPRESSION_TIMES_NANOS, "rocksdb.compression.times.nanos"},
{DECOMPRESSION_TIMES_NANOS, "rocksdb.decompression.times.nanos"},
};

struct HistogramData {
Expand All @@ -357,6 +376,9 @@ enum StatsLevel {
// Collect all stats except the counters requiring to get time inside the
// mutex lock.
kExceptTimeForMutex,
// Collect all stats expect time inside mutex lock AND time spent on
// compression
kExceptDetailedTimers,
// Collect all stats, including measuring duration of mutex operations.
// If getting time is expensive on the platform to run, it can
// reduce scalability to more threads, especially for writes.
Expand Down
17 changes: 16 additions & 1 deletion table/block_based_table_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -651,11 +651,16 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
auto type = r->compression_type;
Slice block_contents;
bool abort_compression = false;

StopWatchNano timer(r->ioptions.env,
ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics));

if (raw_block_contents.size() < kCompressionSizeLimit) {
Slice compression_dict;
if (is_data_block && r->compression_dict && r->compression_dict->size()) {
compression_dict = *r->compression_dict;
}

block_contents = CompressBlock(raw_block_contents, r->compression_opts,
&type, r->table_options.format_version,
compression_dict, &r->compressed_output);
Expand All @@ -668,7 +673,8 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
BlockContents contents;
Status stat = UncompressBlockContentsForCompressionType(
block_contents.data(), block_contents.size(), &contents,
r->table_options.format_version, compression_dict, type);
r->table_options.format_version, compression_dict, type,
r->ioptions);

if (stat.ok()) {
bool compressed_ok = contents.data.compare(raw_block_contents) == 0;
Expand Down Expand Up @@ -698,6 +704,15 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
type = kNoCompression;
block_contents = raw_block_contents;
}
else if (type != kNoCompression &&
ShouldReportDetailedTime(r->ioptions.env,
r->ioptions.statistics)) {
MeasureTime(r->ioptions.statistics, COMPRESSION_TIMES_NANOS,
timer.ElapsedNanos());
MeasureTime(r->ioptions.statistics, BYTES_COMPRESSED,
raw_block_contents.size());
RecordTick(r->ioptions.statistics, NUMBER_BLOCK_COMPRESSED);
}

WriteRawBlock(block_contents, type, handle);
r->compressed_output.clear();
Expand Down
Loading

0 comments on commit 9430333

Please sign in to comment.