Skip to content

Commit

Permalink
Passing table properties to compaction callback
Browse files Browse the repository at this point in the history
Summary: It would be nice to have and access to table properties in compaction callbacks. In MyRocks project, it will make possible to update optimizer statistics online.

Test Plan: ran the unit test. Ran myrocks with the new way of collecting stats.

Reviewers: igor, rven, yhchiang

Reviewed By: yhchiang

Subscribers: dhruba

Differential Revision: https://reviews.facebook.net/D48267
  • Loading branch information
maykov committed Oct 10, 2015
1 parent 64546af commit 3d07b81
Show file tree
Hide file tree
Showing 11 changed files with 127 additions and 20 deletions.
11 changes: 11 additions & 0 deletions db/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,14 @@ class Compaction {
int output_level, VersionStorageInfo* vstorage,
const std::vector<CompactionInputFiles>& inputs);

TablePropertiesCollection GetOutputTableProperties() const {
return output_table_properties_;
}

void SetOutputTableProperties(TablePropertiesCollection tp) {
output_table_properties_ = std::move(tp);
}

private:
// mark (or clear) all files that are being compacted
void MarkFilesBeingCompacted(bool mark_as_compacted);
Expand Down Expand Up @@ -273,6 +281,9 @@ class Compaction {

// Does input compression match the output compression?
bool InputCompressionMatchesOutput() const;

// table properties of output files
TablePropertiesCollection output_table_properties_;
};

// Utility function
Expand Down
16 changes: 15 additions & 1 deletion db/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ struct CompactionJob::SubcompactionState {
struct Output {
FileMetaData meta;
bool finished;
std::shared_ptr<const TableProperties> table_properties;
};

// State kept for output being generated
Expand Down Expand Up @@ -487,6 +488,16 @@ Status CompactionJob::Run() {
}
}

TablePropertiesCollection tp;
for (const auto& state : compact_->sub_compact_states) {
for (const auto& output : state.outputs) {
auto fn = TableFileName(db_options_.db_paths, output.meta.fd.GetNumber(),
output.meta.fd.GetPathId());
tp[fn] = output.table_properties;
}
}
compact_->compaction->SetOutputTableProperties(std::move(tp));

// Finish up all book-keeping to unify the subcompaction results
AggregateStatistics();
UpdateCompactionStats();
Expand Down Expand Up @@ -814,7 +825,10 @@ Status CompactionJob::FinishCompactionOutputFile(

delete iter;
if (s.ok()) {
TableFileCreationInfo info(sub_compact->builder->GetTableProperties());
auto tp = sub_compact->builder->GetTableProperties();
sub_compact->current_output()->table_properties =
std::make_shared<TableProperties>(tp);
TableFileCreationInfo info(std::move(tp));
info.db_name = dbname_;
info.cf_name = cfd->GetName();
info.file_path =
Expand Down
26 changes: 18 additions & 8 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1412,15 +1412,16 @@ Status DBImpl::FlushMemTableToOutputFile(
if (s.ok()) {
// may temporarily unlock and lock the mutex.
NotifyOnFlushCompleted(cfd, &file_meta, mutable_cf_options,
job_context->job_id);
job_context->job_id, flush_job.GetTableProperties());
}
#endif // ROCKSDB_LITE
return s;
}

void DBImpl::NotifyOnFlushCompleted(
ColumnFamilyData* cfd, FileMetaData* file_meta,
const MutableCFOptions& mutable_cf_options, int job_id) {
void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd,
FileMetaData* file_meta,
const MutableCFOptions& mutable_cf_options,
int job_id, TableProperties prop) {
#ifndef ROCKSDB_LITE
if (db_options_.listeners.size() == 0U) {
return;
Expand Down Expand Up @@ -1450,6 +1451,7 @@ void DBImpl::NotifyOnFlushCompleted(
info.triggered_writes_stop = triggered_writes_stop;
info.smallest_seqno = file_meta->smallest_seqno;
info.largest_seqno = file_meta->largest_seqno;
info.table_properties = prop;
for (auto listener : db_options_.listeners) {
listener->OnFlushCompleted(this, info);
}
Expand Down Expand Up @@ -1795,12 +1797,20 @@ void DBImpl::NotifyOnCompactionCompleted(
info.base_input_level = c->start_level();
info.output_level = c->output_level();
info.stats = compaction_job_stats;
info.table_properties = c->GetOutputTableProperties();
for (size_t i = 0; i < c->num_input_levels(); ++i) {
for (const auto fmd : *c->inputs(i)) {
info.input_files.push_back(
TableFileName(db_options_.db_paths,
fmd->fd.GetNumber(),
fmd->fd.GetPathId()));
auto fn = TableFileName(db_options_.db_paths, fmd->fd.GetNumber(),
fmd->fd.GetPathId());
info.input_files.push_back(fn);
if (info.table_properties.count(fn) == 0) {
std::shared_ptr<const TableProperties> tp;
std::string fname;
auto s = cfd->current()->GetTableProperties(&tp, fmd, &fname);
if (s.ok()) {
info.table_properties[fn] = tp;
}
}
}
}
for (const auto newf : c->edit()->GetNewFiles()) {
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ class DBImpl : public DB {

void NotifyOnFlushCompleted(ColumnFamilyData* cfd, FileMetaData* file_meta,
const MutableCFOptions& mutable_cf_options,
int job_id);
int job_id, TableProperties prop);

void NotifyOnCompactionCompleted(ColumnFamilyData* cfd,
Compaction *c, const Status &st,
Expand Down
2 changes: 1 addition & 1 deletion db/event_helpers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ void EventHelpers::LogAndNotifyTableFileCreation(
info.table_properties.filter_policy_name;

// user collected properties
for (const auto& prop : info.table_properties.user_collected_properties) {
for (const auto& prop : info.table_properties.readable_properties) {
jwriter << prop.first << prop.second;
}
jwriter.EndObject();
Expand Down
17 changes: 9 additions & 8 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,15 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,

TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
&output_compression_);
s = BuildTable(
dbname_, db_options_.env, *cfd_->ioptions(), env_options_,
cfd_->table_cache(), iter.get(), meta, cfd_->internal_comparator(),
cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
existing_snapshots_, output_compression_,
cfd_->ioptions()->compression_opts,
mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
Env::IO_HIGH, &info.table_properties);
s = BuildTable(dbname_, db_options_.env, *cfd_->ioptions(), env_options_,
cfd_->table_cache(), iter.get(), meta,
cfd_->internal_comparator(),
cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
existing_snapshots_, output_compression_,
cfd_->ioptions()->compression_opts,
mutable_cf_options_.paranoid_file_checks,
cfd_->internal_stats(), Env::IO_HIGH, &table_properties_);
info.table_properties = table_properties_;
LogFlush(db_options_.info_log);
}
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
Expand Down
2 changes: 2 additions & 0 deletions db/flush_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class FlushJob {
~FlushJob();

Status Run(FileMetaData* file_meta = nullptr);
TableProperties GetTableProperties() const { return table_properties_; }

private:
void ReportStartedFlush();
Expand All @@ -89,6 +90,7 @@ class FlushJob {
CompressionType output_compression_;
Statistics* stats_;
EventLogger* event_logger_;
TableProperties table_properties_;
};

} // namespace rocksdb
55 changes: 55 additions & 0 deletions db/listener_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,39 @@ class EventListenerTest : public testing::Test {
std::vector<ColumnFamilyHandle*> handles_;
};

struct TestPropertiesCollector : public rocksdb::TablePropertiesCollector {
virtual rocksdb::Status AddUserKey(const rocksdb::Slice& key,
const rocksdb::Slice& value,
rocksdb::EntryType type,
rocksdb::SequenceNumber seq,
uint64_t file_size) {
return Status::OK();
}
virtual rocksdb::Status Finish(rocksdb::UserCollectedProperties* properties) {
properties->insert({"0", "1"});
return Status::OK();
}

virtual const char* Name() const override {
return "TestTablePropertiesCollector";
}

rocksdb::UserCollectedProperties GetReadableProperties() const override {
rocksdb::UserCollectedProperties ret;
ret["2"] = "3";
return ret;
}
};

class TestPropertiesCollectorFactory : public TablePropertiesCollectorFactory {
public:
virtual TablePropertiesCollector* CreateTablePropertiesCollector(
TablePropertiesCollectorFactory::Context context) override {
return new TestPropertiesCollector;
}
const char* Name() const override { return "TestTablePropertiesCollector"; }
};

class TestCompactionListener : public EventListener {
public:
void OnCompactionCompleted(DB *db, const CompactionJobInfo& ci) override {
Expand All @@ -161,6 +194,16 @@ class TestCompactionListener : public EventListener {
ASSERT_GT(ci.output_files.size(), 0U);
ASSERT_EQ(db->GetEnv()->GetThreadID(), ci.thread_id);
ASSERT_GT(ci.thread_id, 0U);

for (auto fl : {ci.input_files, ci.output_files}) {
for (auto fn : fl) {
auto it = ci.table_properties.find(fn);
ASSERT_NE(it, ci.table_properties.end());
auto tp = it->second;
ASSERT_TRUE(tp != nullptr);
ASSERT_EQ(tp->user_collected_properties.find("0")->second, "1");
}
}
}

std::vector<DB*> compacted_dbs_;
Expand All @@ -186,6 +229,8 @@ TEST_F(EventListenerTest, OnSingleDBCompactionTest) {
options.enable_thread_tracking = true;
#endif // ROCKSDB_USING_THREAD_STATUS
options.level0_file_num_compaction_trigger = kNumL0Files;
options.table_properties_collector_factories.push_back(
std::make_shared<TestPropertiesCollectorFactory>());

TestCompactionListener* listener = new TestCompactionListener();
options.listeners.emplace_back(listener);
Expand Down Expand Up @@ -274,6 +319,8 @@ class TestFlushListener : public EventListener {
ASSERT_EQ(prev_fc_info_.file_path, info.file_path);
ASSERT_EQ(db->GetEnv()->GetThreadID(), info.thread_id);
ASSERT_GT(info.thread_id, 0U);
ASSERT_EQ(info.table_properties.user_collected_properties.find("0")->second,
"1");
}

std::vector<std::string> flushed_column_family_names_;
Expand All @@ -299,6 +346,8 @@ TEST_F(EventListenerTest, OnSingleDBFlushTest) {
std::vector<std::string> cf_names = {
"pikachu", "ilya", "muromec", "dobrynia",
"nikitich", "alyosha", "popovich"};
options.table_properties_collector_factories.push_back(
std::make_shared<TestPropertiesCollectorFactory>());
CreateAndReopenWithCF(cf_names, &options);

ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
Expand Down Expand Up @@ -330,6 +379,8 @@ TEST_F(EventListenerTest, MultiCF) {
#endif // ROCKSDB_USING_THREAD_STATUS
TestFlushListener* listener = new TestFlushListener(options.env);
options.listeners.emplace_back(listener);
options.table_properties_collector_factories.push_back(
std::make_shared<TestPropertiesCollectorFactory>());
std::vector<std::string> cf_names = {
"pikachu", "ilya", "muromec", "dobrynia",
"nikitich", "alyosha", "popovich"};
Expand Down Expand Up @@ -360,6 +411,8 @@ TEST_F(EventListenerTest, MultiDBMultiListeners) {
#if ROCKSDB_USING_THREAD_STATUS
options.enable_thread_tracking = true;
#endif // ROCKSDB_USING_THREAD_STATUS
options.table_properties_collector_factories.push_back(
std::make_shared<TestPropertiesCollectorFactory>());
std::vector<TestFlushListener*> listeners;
const int kNumDBs = 5;
const int kNumListeners = 10;
Expand Down Expand Up @@ -454,6 +507,8 @@ TEST_F(EventListenerTest, DisableBGCompaction) {
options.compaction_style = kCompactionStyleNone;
options.compression = kNoCompression;
options.write_buffer_size = 100000; // Small write buffer
options.table_properties_collector_factories.push_back(
std::make_shared<TestPropertiesCollectorFactory>());

CreateAndReopenWithCF({"pikachu"}, &options);
ColumnFamilyMetaData cf_meta;
Expand Down
12 changes: 12 additions & 0 deletions include/rocksdb/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@

#pragma once

#include <memory>
#include <string>
#include <unordered_map>
#include <vector>
#include "rocksdb/compaction_job_stats.h"
#include "rocksdb/status.h"
#include "rocksdb/table_properties.h"

namespace rocksdb {

typedef std::unordered_map<std::string, std::shared_ptr<const TableProperties>>
TablePropertiesCollection;

class DB;
class Status;
struct CompactionJobStats;
Expand Down Expand Up @@ -72,6 +77,8 @@ struct FlushJobInfo {
SequenceNumber smallest_seqno;
// The largest sequence number in the newly created file
SequenceNumber largest_seqno;
// Table properties of the table being flushed
TableProperties table_properties;
};

struct CompactionJobInfo {
Expand All @@ -93,8 +100,13 @@ struct CompactionJobInfo {
int output_level;
// the names of the compaction input files.
std::vector<std::string> input_files;

// the names of the compaction output files.
std::vector<std::string> output_files;
// Table properties for input and output tables.
// The map is keyed by values from input_files and output_files.
TablePropertiesCollection table_properties;

// If non-null, this variable stores detailed information
// about this compaction.
CompactionJobStats stats;
Expand Down
1 change: 1 addition & 0 deletions include/rocksdb/table_properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ struct TableProperties {

// user collected properties
UserCollectedProperties user_collected_properties;
UserCollectedProperties readable_properties;

// convert this object to a human readable form
// @prop_delim: delimiter for each property.
Expand Down
3 changes: 2 additions & 1 deletion table/block_based_table_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -873,8 +873,9 @@ TableProperties BlockBasedTableBuilder::GetTableProperties() const {
TableProperties ret = rep_->props;
for (const auto& collector : rep_->table_properties_collectors) {
for (const auto& prop : collector->GetReadableProperties()) {
ret.user_collected_properties.insert(prop);
ret.readable_properties.insert(prop);
}
collector->Finish(&ret.user_collected_properties);
}
return ret;
}
Expand Down

0 comments on commit 3d07b81

Please sign in to comment.