Skip to content

Commit

Permalink
Add EventListener::OnExternalFileIngested() event
Browse files Browse the repository at this point in the history
Summary:
Add EventListener::OnExternalFileIngested() to allow user to subscribe to external file ingestion events
Closes facebook#1623

Differential Revision: D4285844

Pulled By: IslamAbdelRahman

fbshipit-source-id: 0b95a88
  • Loading branch information
IslamAbdelRahman authored and Facebook Github Bot committed Dec 6, 2016
1 parent 2005c88 commit ed8fbdb
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 7 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
## Unreleased
### Public API Change
* Support dynamically change `delete_obsolete_files_period_micros` option via SetDBOptions().
* Added EventListener::OnExternalFileIngested which will be called when IngestExternalFile() add a file successfully.

## 5.0.0 (11/17/2016)
### Public API Change
Expand Down
26 changes: 26 additions & 0 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6539,9 +6539,35 @@ Status DBImpl::IngestExternalFile(
// Cleanup
ingestion_job.Cleanup(status);

if (status.ok()) {
NotifyOnExternalFileIngested(cfd, ingestion_job);
}

return status;
}

void DBImpl::NotifyOnExternalFileIngested(
ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job) {
#ifndef ROCKSDB_LITE
if (immutable_db_options_.listeners.empty()) {
return;
}

for (const IngestedFileInfo& f : ingestion_job.files_to_ingest()) {
ExternalFileIngestionInfo info;
info.cf_name = cfd->GetName();
info.external_file_path = f.external_file_path;
info.internal_file_path = f.internal_file_path;
info.global_seqno = f.assigned_seqno;
info.table_properties = f.table_properties;
for (auto listener : immutable_db_options_.listeners) {
listener->OnExternalFileIngested(this, info);
}
}

#endif
}

void DBImpl::WaitForIngestFile() {
mutex_.AssertHeld();
while (num_running_ingest_file_ > 0) {
Expand Down
4 changes: 4 additions & 0 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "db/column_family.h"
#include "db/compaction_job.h"
#include "db/dbformat.h"
#include "db/external_sst_file_ingestion_job.h"
#include "db/flush_job.h"
#include "db/flush_scheduler.h"
#include "db/internal_stats.h"
Expand Down Expand Up @@ -556,6 +557,9 @@ class DBImpl : public DB {
void NotifyOnMemTableSealed(ColumnFamilyData* cfd,
const MemTableInfo& mem_table_info);

void NotifyOnExternalFileIngested(
ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job);

void NewThreadStatusCfInfo(ColumnFamilyData* cfd) const;

void EraseThreadStatusCfInfo(ColumnFamilyData* cfd) const;
Expand Down
2 changes: 2 additions & 0 deletions db/external_sst_file_ingestion_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,8 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(

file_to_ingest->cf_id = static_cast<uint32_t>(props->column_family_id);

file_to_ingest->table_properties = *props;

return status;
}

Expand Down
6 changes: 6 additions & 0 deletions db/external_sst_file_ingestion_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ struct IngestedFileInfo {
uint64_t num_entries;
// Id of column family this file shoule be ingested into
uint32_t cf_id;
// TableProperties read from external file
TableProperties table_properties;
// Version of external file
int version;

Expand Down Expand Up @@ -98,6 +100,10 @@ class ExternalSstFileIngestionJob {

VersionEdit* edit() { return &edit_; }

const autovector<IngestedFileInfo>& files_to_ingest() const {
return files_to_ingest_;
}

private:
// Open the external file and populate `file_to_ingest` with all the
// external information we need to ingest this file.
Expand Down
74 changes: 67 additions & 7 deletions db/external_sst_file_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class ExternalSSTFileTest : public DBTestBase {
const Options options,
std::vector<std::pair<std::string, std::string>> data, int file_id = -1,
bool allow_global_seqno = false, bool sort_data = false,
std::map<std::string, std::string>* true_data = nullptr) {
std::map<std::string, std::string>* true_data = nullptr,
ColumnFamilyHandle* cfh = nullptr) {
// Generate a file id if not provided
if (file_id == -1) {
file_id = last_file_id_ + 1;
Expand All @@ -51,7 +52,8 @@ class ExternalSSTFileTest : public DBTestBase {
data.resize(uniq_iter - data.begin());
}
std::string file_path = sst_files_dir_ + ToString(file_id);
SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator);
SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator,
cfh);

Status s = sst_file_writer.Open(file_path);
if (!s.ok()) {
Expand All @@ -69,7 +71,11 @@ class ExternalSSTFileTest : public DBTestBase {
if (s.ok()) {
IngestExternalFileOptions ifo;
ifo.allow_global_seqno = allow_global_seqno;
s = db_->IngestExternalFile({file_path}, ifo);
if (cfh) {
s = db_->IngestExternalFile(cfh, {file_path}, ifo);
} else {
s = db_->IngestExternalFile({file_path}, ifo);
}
}

if (s.ok() && true_data) {
Expand All @@ -84,25 +90,29 @@ class ExternalSSTFileTest : public DBTestBase {
Status GenerateAndAddExternalFile(
const Options options, std::vector<std::pair<int, std::string>> data,
int file_id = -1, bool allow_global_seqno = false, bool sort_data = false,
std::map<std::string, std::string>* true_data = nullptr) {
std::map<std::string, std::string>* true_data = nullptr,
ColumnFamilyHandle* cfh = nullptr) {
std::vector<std::pair<std::string, std::string>> file_data;
for (auto& entry : data) {
file_data.emplace_back(Key(entry.first), entry.second);
}
return GenerateAndAddExternalFile(options, file_data, file_id,
allow_global_seqno, sort_data, true_data);
allow_global_seqno, sort_data, true_data,
cfh);
}

Status GenerateAndAddExternalFile(
const Options options, std::vector<int> keys, int file_id = -1,
bool allow_global_seqno = false, bool sort_data = false,
std::map<std::string, std::string>* true_data = nullptr) {
std::map<std::string, std::string>* true_data = nullptr,
ColumnFamilyHandle* cfh = nullptr) {
std::vector<std::pair<std::string, std::string>> file_data;
for (auto& k : keys) {
file_data.emplace_back(Key(k), Key(k) + ToString(file_id));
}
return GenerateAndAddExternalFile(options, file_data, file_id,
allow_global_seqno, sort_data, true_data);
allow_global_seqno, sort_data, true_data,
cfh);
}

Status DeprecatedAddFile(const std::vector<std::string>& files,
Expand Down Expand Up @@ -1835,6 +1845,56 @@ TEST_F(ExternalSSTFileTest, FileWithCFInfo) {
ASSERT_OK(db_->IngestExternalFile(handles_[0], {unknown_sst}, ifo));
}

class TestIngestExternalFileListener : public EventListener {
public:
void OnExternalFileIngested(DB* db,
const ExternalFileIngestionInfo& info) override {
ingested_files.push_back(info);
}

std::vector<ExternalFileIngestionInfo> ingested_files;
};

TEST_F(ExternalSSTFileTest, IngestionListener) {
Options options = CurrentOptions();
TestIngestExternalFileListener* listener =
new TestIngestExternalFileListener();
options.listeners.emplace_back(listener);
CreateAndReopenWithCF({"koko", "toto"}, options);

// Ingest into default cf
ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2}, -1, true, true, nullptr,
handles_[0]));
ASSERT_EQ(listener->ingested_files.size(), 1);
ASSERT_EQ(listener->ingested_files.back().cf_name, "default");
ASSERT_EQ(listener->ingested_files.back().global_seqno, 0);
ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_id,
0);
ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_name,
"default");

// Ingest into cf1
ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2}, -1, true, true, nullptr,
handles_[1]));
ASSERT_EQ(listener->ingested_files.size(), 2);
ASSERT_EQ(listener->ingested_files.back().cf_name, "koko");
ASSERT_EQ(listener->ingested_files.back().global_seqno, 0);
ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_id,
1);
ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_name,
"koko");

// Ingest into cf2
ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2}, -1, true, true, nullptr,
handles_[2]));
ASSERT_EQ(listener->ingested_files.size(), 3);
ASSERT_EQ(listener->ingested_files.back().cf_name, "toto");
ASSERT_EQ(listener->ingested_files.back().global_seqno, 0);
ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_id,
2);
ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_name,
"toto");
}
#endif // ROCKSDB_LITE

} // namespace rocksdb
Expand Down
23 changes: 23 additions & 0 deletions include/rocksdb/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,20 @@ struct MemTableInfo {

};

struct ExternalFileIngestionInfo {
// the name of the column family
std::string cf_name;
// Path of the file outside the DB
std::string external_file_path;
// Path of the file inside the DB
std::string internal_file_path;
// The global sequence number assigned to keys in this file
SequenceNumber global_seqno;
// Table properties of the table being flushed
TableProperties table_properties;
};


// EventListener class contains a set of call-back functions that will
// be called when specific RocksDB event happens such as flush. It can
// be used as a building block for developing custom features such as
Expand Down Expand Up @@ -291,6 +305,15 @@ class EventListener {
virtual void OnColumnFamilyHandleDeletionStarted(ColumnFamilyHandle* handle) {
}

// A call-back function for RocksDB which will be called after an external
// file is ingested using IngestExternalFile.
//
// Note that the this function will run on the same thread as
// IngestExternalFile(), if this function is blocked, IngestExternalFile()
// will be blocked from finishing.
virtual void OnExternalFileIngested(
DB* /*db*/, const ExternalFileIngestionInfo& /*info*/) {}

virtual ~EventListener() {}
};

Expand Down

0 comments on commit ed8fbdb

Please sign in to comment.