Skip to content

Commit

Permalink
Add Option to Skip Flushing in TableBuilder
Browse files Browse the repository at this point in the history
  • Loading branch information
SherlockNoMad committed Oct 30, 2015
1 parent 2872e0c commit a6dd083
Show file tree
Hide file tree
Showing 21 changed files with 115 additions and 34 deletions.
8 changes: 5 additions & 3 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@ TableBuilder* NewTableBuilder(
int_tbl_prop_collector_factories,
uint32_t column_family_id, WritableFileWriter* file,
const CompressionType compression_type,
const CompressionOptions& compression_opts, const bool skip_filters) {
const CompressionOptions& compression_opts,
const bool skip_filters, const bool skip_flush) {
return ioptions.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, internal_comparator,
int_tbl_prop_collector_factories, compression_type,
compression_opts, skip_filters),
compression_opts, skip_filters, skip_flush),
column_family_id, file);
}

Expand Down Expand Up @@ -86,7 +87,8 @@ Status BuildTable(

builder = NewTableBuilder(
ioptions, internal_comparator, int_tbl_prop_collector_factories,
column_family_id, file_writer.get(), compression, compression_opts);
column_family_id, file_writer.get(), compression, compression_opts,
false, env_options.skip_table_builder_flush);
}

MergeHelper merge(env, internal_comparator.user_comparator(),
Expand Down
3 changes: 2 additions & 1 deletion db/builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ TableBuilder* NewTableBuilder(
uint32_t column_family_id, WritableFileWriter* file,
const CompressionType compression_type,
const CompressionOptions& compression_opts,
const bool skip_filters = false);
const bool skip_filters = false,
const bool skip_flush = false);

// Build a Table file from the contents of *iter. The generated file
// will be named according to number specified in meta. On success, the rest of
Expand Down
3 changes: 2 additions & 1 deletion db/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -939,11 +939,12 @@ Status CompactionJob::OpenCompactionOutputFile(
// data is going to be found
bool skip_filters =
cfd->ioptions()->optimize_filters_for_hits && bottommost_level_;
bool skip_flush = db_options_.skip_table_builder_flush;
sub_compact->builder.reset(NewTableBuilder(
*cfd->ioptions(), cfd->internal_comparator(),
cfd->int_tbl_prop_collector_factories(), cfd->GetID(),
sub_compact->outfile.get(), sub_compact->compaction->output_compression(),
cfd->ioptions()->compression_opts, skip_filters));
cfd->ioptions()->compression_opts, skip_filters, skip_flush));
LogFlush(db_options_.info_log);
return s;
}
Expand Down
8 changes: 8 additions & 0 deletions db/db_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,12 @@ DEFINE_int32(compaction_readahead_size, 0, "Compaction readahead size");
DEFINE_int32(random_access_max_buffer_size, 1024 * 1024,
"Maximum windows randomaccess buffer size");

DEFINE_int32(writable_file_max_buffer_size, 1024 * 1024,
"Maximum write buffer for Writeable File");

DEFINE_int32(skip_table_builder_flush, false, "Skip flushing block in "
"table builder ");

DEFINE_int32(bloom_bits, -1, "Bloom filter bits per key. Negative means"
" use default settings.");
DEFINE_int32(memtable_bloom_bits, 0, "Bloom filter bits per key for memtable. "
Expand Down Expand Up @@ -2299,6 +2305,8 @@ class Benchmark {
FLAGS_new_table_reader_for_compaction_inputs;
options.compaction_readahead_size = FLAGS_compaction_readahead_size;
options.random_access_max_buffer_size = FLAGS_random_access_max_buffer_size;
options.writable_file_max_buffer_size = FLAGS_writable_file_max_buffer_size;
options.skip_table_builder_flush = FLAGS_skip_table_builder_flush;
options.statistics = dbstats;
if (FLAGS_enable_io_prio) {
FLAGS_env->LowerThreadPoolIOPriority(Env::LOW);
Expand Down
6 changes: 6 additions & 0 deletions include/rocksdb/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ struct EnvOptions {
// See DBOPtions doc
size_t random_access_max_buffer_size;

// See DBOptions doc
size_t writable_file_max_buffer_size = 1024 * 1024;

// See DBOptions doc
bool skip_table_builder_flush = false;

// If not nullptr, write rate limiting is enabled for flush and compaction
RateLimiter* rate_limiter = nullptr;
};
Expand Down
21 changes: 21 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -1089,6 +1089,27 @@ struct DBOptions {
// Default: 1 Mb
size_t random_access_max_buffer_size;

// This is the maximum buffer size that is used by WritableFileWriter.
// On Windows, we need to maintain an aligned buffer for writes.
// We allow the buffer to grow until it's size hits the limit.
//
// Default: 1024 * 1024 (1 MB)
size_t writable_file_max_buffer_size;

// If true, block will not be explictly flushed to disk during building
// a SstTable. Instead, buffer in WritableFileWriter will take
// care of the flushing when it is full.
//
// On Windows, this option helps a lot when unbuffered I/O
// (allow_os_buffer = false) is used, since it avoids small
// unbuffered disk write.
//
// User may also adjust writable_file_max_buffer_size to optimize disk I/O
// size.
//
// Default: false
bool skip_table_builder_flush;

// Use adaptive mutex, which spins in the user space before resorting
// to kernel. This could reduce context switch when the mutex is not
// heavily contended. However, if the mutex is hot, we could end up
Expand Down
30 changes: 20 additions & 10 deletions table/block_based_table_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ struct BlockBasedTableBuilder::Rep {
BlockHandle pending_handle; // Handle to add to index block

std::string compressed_output;
bool skip_flush;
std::unique_ptr<FlushBlockPolicy> flush_block_policy;

std::vector<std::unique_ptr<IntTblPropCollector>> table_properties_collectors;
Expand All @@ -476,7 +477,8 @@ struct BlockBasedTableBuilder::Rep {
int_tbl_prop_collector_factories,
uint32_t column_family_id, WritableFileWriter* f,
const CompressionType _compression_type,
const CompressionOptions& _compression_opts, const bool skip_filters)
const CompressionOptions& _compression_opts,
const bool skip_filters, const bool skip_flush)
: ioptions(_ioptions),
table_options(table_opt),
internal_comparator(icomparator),
Expand All @@ -490,6 +492,7 @@ struct BlockBasedTableBuilder::Rep {
compression_opts(_compression_opts),
filter_block(skip_filters ? nullptr : CreateFilterBlockBuilder(
_ioptions, table_options)),
skip_flush(skip_flush),
flush_block_policy(
table_options.flush_block_policy_factory->NewFlushBlockPolicy(
table_options, data_block)) {
Expand All @@ -512,7 +515,8 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(
int_tbl_prop_collector_factories,
uint32_t column_family_id, WritableFileWriter* file,
const CompressionType compression_type,
const CompressionOptions& compression_opts, const bool skip_filters) {
const CompressionOptions& compression_opts,
const bool skip_filters, const bool skip_flush) {
BlockBasedTableOptions sanitized_table_options(table_options);
if (sanitized_table_options.format_version == 0 &&
sanitized_table_options.checksum != kCRC32c) {
Expand All @@ -526,7 +530,7 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(

rep_ = new Rep(ioptions, sanitized_table_options, internal_comparator,
int_tbl_prop_collector_factories, column_family_id, file,
compression_type, compression_opts, skip_filters);
compression_type, compression_opts, skip_filters, skip_flush);

if (rep_->filter_block != nullptr) {
rep_->filter_block->StartBlock(0);
Expand All @@ -552,11 +556,12 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
assert(r->internal_comparator.Compare(key, Slice(r->last_key)) > 0);
}

auto should_flush = r->flush_block_policy->Update(key, value);
if (should_flush) {
auto should_seal = r->flush_block_policy->Update(key, value);
if (should_seal) {
assert(!r->data_block.empty());
SealDataBlock();
Flush();

// Add item to index block.
// We do not emit the index entry for a block until we have seen the
// first key for the next data block. This allows us to use shorter
Expand Down Expand Up @@ -587,19 +592,23 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
}

void BlockBasedTableBuilder::Flush() {
Rep* r = rep_;
if (ok() && !r->skip_flush) {
r->status = r->file->Flush();
}
}

void BlockBasedTableBuilder::SealDataBlock() {
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
if (r->data_block.empty()) return;
WriteBlock(&r->data_block, &r->pending_handle);
if (ok()) {
r->status = r->file->Flush();
}
if (r->filter_block != nullptr) {
r->filter_block->StartBlock(r->offset);
}
r->props.data_size = r->offset;
++r->props.num_data_blocks;
++r->props.num_data_blocks;
}

void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block,
Expand Down Expand Up @@ -728,6 +737,7 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
Status BlockBasedTableBuilder::Finish() {
Rep* r = rep_;
bool empty_data_block = r->data_block.empty();
SealDataBlock();
Flush();
assert(!r->closed);
r->closed = true;
Expand Down
5 changes: 4 additions & 1 deletion table/block_based_table_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ class BlockBasedTableBuilder : public TableBuilder {
int_tbl_prop_collector_factories,
uint32_t column_family_id, WritableFileWriter* file,
const CompressionType compression_type,
const CompressionOptions& compression_opts, const bool skip_filters);
const CompressionOptions& compression_opts,
const bool skip_filters, const bool skip_flush);

// REQUIRES: Either Finish() or Abandon() has been called.
~BlockBasedTableBuilder();
Expand Down Expand Up @@ -101,6 +102,8 @@ class BlockBasedTableBuilder : public TableBuilder {
// REQUIRES: Finish(), Abandon() have not been called
void Flush();

void SealDataBlock();

// Some compression libraries fail when the raw size is bigger than int. If
// uncompressed size is bigger than kCompressionSizeLimit, don't compress it
const uint64_t kCompressionSizeLimit = std::numeric_limits<int>::max();
Expand Down
3 changes: 2 additions & 1 deletion table/block_based_table_factory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ TableBuilder* BlockBasedTableFactory::NewTableBuilder(
table_builder_options.int_tbl_prop_collector_factories, column_family_id,
file, table_builder_options.compression_type,
table_builder_options.compression_opts,
table_builder_options.skip_filters);
table_builder_options.skip_filters,
table_builder_options.skip_flush);

return table_builder;
}
Expand Down
3 changes: 2 additions & 1 deletion table/sst_file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ Status SstFileWriter::Open(const std::string& file_path) {

TableBuilderOptions table_builder_options(
r->ioptions, r->internal_comparator, &int_tbl_prop_collector_factories,
compression_type, r->ioptions.compression_opts, false);
compression_type, r->ioptions.compression_opts, false,
r->env_options.skip_table_builder_flush);
r->file_writer.reset(
new WritableFileWriter(std::move(sst_file), r->env_options));
r->builder.reset(r->ioptions.table_factory->NewTableBuilder(
Expand Down
7 changes: 5 additions & 2 deletions table/table_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,23 @@ struct TableBuilderOptions {
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
_int_tbl_prop_collector_factories,
CompressionType _compression_type,
const CompressionOptions& _compression_opts, bool _skip_filters)
const CompressionOptions& _compression_opts,
bool _skip_filters, bool _skip_flush)
: ioptions(_ioptions),
internal_comparator(_internal_comparator),
int_tbl_prop_collector_factories(_int_tbl_prop_collector_factories),
compression_type(_compression_type),
compression_opts(_compression_opts),
skip_filters(_skip_filters) {}
skip_filters(_skip_filters),
skip_flush(_skip_flush) {}
const ImmutableCFOptions& ioptions;
const InternalKeyComparator& internal_comparator;
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories;
CompressionType compression_type;
const CompressionOptions& compression_opts;
bool skip_filters = false;
bool skip_flush = false;
};

// TableBuilder provides the interface used to build a Table
Expand Down
3 changes: 2 additions & 1 deletion table/table_reader_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options,
tb = opts.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, ikc, &int_tbl_prop_collector_factories,
CompressionType::kNoCompression,
CompressionOptions(), false),
CompressionOptions(), false,
env_options.skip_table_builder_flush),
0, file_writer.get());
} else {
s = DB::Open(opts, dbname, &db);
Expand Down
6 changes: 4 additions & 2 deletions table/table_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ class TableConstructor: public Constructor {
builder.reset(ioptions.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, internal_comparator,
&int_tbl_prop_collector_factories,
options.compression, CompressionOptions(), false),
options.compression, CompressionOptions(),
false, options.skip_table_builder_flush),
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
file_writer_.get()));

Expand Down Expand Up @@ -1845,7 +1846,8 @@ TEST_F(PlainTableTest, BasicPlainTableProperties) {
int_tbl_prop_collector_factories;
std::unique_ptr<TableBuilder> builder(factory.NewTableBuilder(
TableBuilderOptions(ioptions, ikc, &int_tbl_prop_collector_factories,
kNoCompression, CompressionOptions(), false),
kNoCompression, CompressionOptions(),
false, options.skip_table_builder_flush),
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
file_writer.get()));

Expand Down
2 changes: 1 addition & 1 deletion tools/sst_dump_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ void createSST(const std::string& file_name,
tb.reset(opts.table_factory->NewTableBuilder(
TableBuilderOptions(imoptions, ikc, &int_tbl_prop_collector_factories,
CompressionType::kNoCompression, CompressionOptions(),
false),
false, env_options.skip_table_builder_flush),
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
file_writer.get()));

Expand Down
2 changes: 1 addition & 1 deletion tools/sst_dump_tool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ int SstFileReader::ShowAllCompressionSizes(size_t block_size) {
: CompressionType(i + 1)) {
CompressionOptions compress_opt;
TableBuilderOptions tb_opts(imoptions, ikc, &block_based_table_factories, i,
compress_opt, false);
compress_opt, false, false);
uint64_t file_size = CalculateCompressedTableSize(tb_opts, block_size);
fprintf(stdout, "Compression: %s", compress_type.find(i)->second);
fprintf(stdout, " Size: %" PRIu64 "\n", file_size);
Expand Down
3 changes: 3 additions & 0 deletions util/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,9 @@ void AssignEnvOptions(EnvOptions* env_options, const DBOptions& options) {
env_options->random_access_max_buffer_size =
options.random_access_max_buffer_size;
env_options->rate_limiter = options.rate_limiter.get();
env_options->writable_file_max_buffer_size =
options.writable_file_max_buffer_size;
env_options->skip_table_builder_flush = options.skip_table_builder_flush;
env_options->allow_fallocate = options.allow_fallocate;
}

Expand Down
13 changes: 4 additions & 9 deletions util/file_reader_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@

namespace rocksdb {

namespace {
const size_t c_OneMb = (1 << 20);
}

Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
Status s = file_->Read(n, result, scratch);
IOSTATS_ADD(bytes_read, result->size());
Expand Down Expand Up @@ -76,9 +72,9 @@ Status WritableFileWriter::Append(const Slice& data) {
}
}

if (buf_.Capacity() < c_OneMb) {
if (buf_.Capacity() < max_buffer_size_) {
size_t desiredCapacity = buf_.Capacity() * 2;
desiredCapacity = std::min(desiredCapacity, c_OneMb);
desiredCapacity = std::min(desiredCapacity, max_buffer_size_);
buf_.AllocateNewBuffer(desiredCapacity);
}
assert(buf_.CurrentSize() == 0);
Expand All @@ -102,9 +98,9 @@ Status WritableFileWriter::Append(const Slice& data) {
// We double the buffer here because
// Flush calls do not keep up with the incoming bytes
// This is the only place when buffer is changed with unbuffered I/O
if (buf_.Capacity() < c_OneMb) {
if (buf_.Capacity() < max_buffer_size_) {
size_t desiredCapacity = buf_.Capacity() * 2;
desiredCapacity = std::min(desiredCapacity, c_OneMb);
desiredCapacity = std::min(desiredCapacity, max_buffer_size_);
buf_.AllocateNewBuffer(desiredCapacity);
}
}
Expand Down Expand Up @@ -156,7 +152,6 @@ Status WritableFileWriter::Close() {
return s;
}


// write out the cached data to the OS cache
Status WritableFileWriter::Flush() {
Status s;
Expand Down
2 changes: 2 additions & 0 deletions util/file_reader_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class WritableFileWriter {
private:
std::unique_ptr<WritableFile> writable_file_;
AlignedBuffer buf_;
size_t max_buffer_size_;
// Actually written data size can be used for truncate
// not counting padding data
uint64_t filesize_;
Expand All @@ -113,6 +114,7 @@ class WritableFileWriter {
const EnvOptions& options)
: writable_file_(std::move(file)),
buf_(),
max_buffer_size_(options.writable_file_max_buffer_size),
filesize_(0),
next_write_offset_(0),
pending_sync_(false),
Expand Down
Loading

0 comments on commit a6dd083

Please sign in to comment.