Skip to content

Commit

Permalink
Remove deprecated RemoteCompaction API (facebook#9570)
Browse files Browse the repository at this point in the history
Summary:
Remove deprecated remote compaction APIs
`CompactionService::Start()` and `CompactionService::WaitForComplete()`.
Please use `CompactionService::StartV2()`,
`CompactionService::WaitForCompleteV2()` instead, which provides the
same information plus extra data like priority, db_id, etc.

Pull Request resolved: facebook#9570

Test Plan: CI

Reviewed By: riversand963

Differential Revision: D34255969

Pulled By: jay-zhuang

fbshipit-source-id: c6376eccdd1123f1c42ab53771b5f65f8160c325
  • Loading branch information
jay-zhuang authored and facebook-github-bot committed Feb 16, 2022
1 parent c42d0cf commit 31031c0
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 201 deletions.
2 changes: 2 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
* Remove deprecated option DBOptions::new_table_reader_for_compaction_inputs.
* Add Transaction::SetReadTimestampForValidation() and Transaction::SetCommitTimestamp(). Default impl returns NotSupported().
* Add support for decimal patterns to ObjectLibrary::PatternEntry
* Remove deprecated remote compaction APIs `CompactionService::Start()` and `CompactionService::WaitForComplete()`. Please use `CompactionService::StartV2()`, `CompactionService::WaitForCompleteV2()` instead, which provides the same information plus extra data like priority, db_id, etc.

### Behavior Changes
* Disallow the combination of DBOptions.use_direct_io_for_flush_and_compaction == true and DBOptions.writable_file_max_buffer_size == 0. This combination can cause WritableFileWriter::Append() to loop forever, and it does not make much sense in direct IO.
* `ReadOptions::total_order_seek` no longer affects `DB::Get()`. The original motivation for this interaction has been obsolete since RocksDB has been able to detect whether the current prefix extractor is compatible with that used to generate table files, probably RocksDB 5.14.0.
Expand Down
229 changes: 55 additions & 174 deletions db/compaction/compaction_service_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,126 +10,7 @@

namespace ROCKSDB_NAMESPACE {

class TestCompactionServiceBase {
public:
virtual int GetCompactionNum() = 0;

void OverrideStartStatus(CompactionServiceJobStatus s) {
is_override_start_status = true;
override_start_status = s;
}

void OverrideWaitStatus(CompactionServiceJobStatus s) {
is_override_wait_status = true;
override_wait_status = s;
}

void OverrideWaitResult(std::string str) {
is_override_wait_result = true;
override_wait_result = std::move(str);
}

void ResetOverride() {
is_override_wait_result = false;
is_override_start_status = false;
is_override_wait_status = false;
}

virtual ~TestCompactionServiceBase() = default;

protected:
bool is_override_start_status = false;
CompactionServiceJobStatus override_start_status =
CompactionServiceJobStatus::kFailure;
bool is_override_wait_status = false;
CompactionServiceJobStatus override_wait_status =
CompactionServiceJobStatus::kFailure;
bool is_override_wait_result = false;
std::string override_wait_result;
};

class MyTestCompactionServiceLegacy : public CompactionService,
public TestCompactionServiceBase {
public:
MyTestCompactionServiceLegacy(std::string db_path, Options& options,
std::shared_ptr<Statistics>& statistics)
: db_path_(std::move(db_path)),
options_(options),
statistics_(statistics) {}

static const char* kClassName() { return "MyTestCompactionServiceLegacy"; }

const char* Name() const override { return kClassName(); }

CompactionServiceJobStatus Start(const std::string& compaction_service_input,
uint64_t job_id) override {
InstrumentedMutexLock l(&mutex_);
jobs_.emplace(job_id, compaction_service_input);
CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess;
if (is_override_start_status) {
return override_start_status;
}
return s;
}

CompactionServiceJobStatus WaitForComplete(
uint64_t job_id, std::string* compaction_service_result) override {
std::string compaction_input;
{
InstrumentedMutexLock l(&mutex_);
auto i = jobs_.find(job_id);
if (i == jobs_.end()) {
return CompactionServiceJobStatus::kFailure;
}
compaction_input = std::move(i->second);
jobs_.erase(i);
}

if (is_override_wait_status) {
return override_wait_status;
}

CompactionServiceOptionsOverride options_override;
options_override.env = options_.env;
options_override.file_checksum_gen_factory =
options_.file_checksum_gen_factory;
options_override.comparator = options_.comparator;
options_override.merge_operator = options_.merge_operator;
options_override.compaction_filter = options_.compaction_filter;
options_override.compaction_filter_factory =
options_.compaction_filter_factory;
options_override.prefix_extractor = options_.prefix_extractor;
options_override.table_factory = options_.table_factory;
options_override.sst_partitioner_factory = options_.sst_partitioner_factory;
options_override.statistics = statistics_;

Status s = DB::OpenAndCompact(
db_path_, db_path_ + "/" + ROCKSDB_NAMESPACE::ToString(job_id),
compaction_input, compaction_service_result, options_override);
if (is_override_wait_result) {
*compaction_service_result = override_wait_result;
}
compaction_num_.fetch_add(1);
if (s.ok()) {
return CompactionServiceJobStatus::kSuccess;
} else {
return CompactionServiceJobStatus::kFailure;
}
}

int GetCompactionNum() override { return compaction_num_.load(); }

private:
InstrumentedMutex mutex_;
std::atomic_int compaction_num_{0};
std::map<uint64_t, std::string> jobs_;
const std::string db_path_;
Options options_;
std::shared_ptr<Statistics> statistics_;
};

class MyTestCompactionService : public CompactionService,
public TestCompactionServiceBase {
class MyTestCompactionService : public CompactionService {
public:
MyTestCompactionService(std::string db_path, Options& options,
std::shared_ptr<Statistics>& statistics)
Expand All @@ -151,8 +32,8 @@ class MyTestCompactionService : public CompactionService,
assert(info.db_name == db_path_);
jobs_.emplace(info.job_id, compaction_service_input);
CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess;
if (is_override_start_status) {
return override_start_status;
if (is_override_start_status_) {
return override_start_status_;
}
return s;
}
Expand All @@ -173,8 +54,8 @@ class MyTestCompactionService : public CompactionService,
jobs_.erase(i);
}

if (is_override_wait_status) {
return override_wait_status;
if (is_override_wait_status_) {
return override_wait_status_;
}

CompactionServiceOptionsOverride options_override;
Expand All @@ -194,8 +75,8 @@ class MyTestCompactionService : public CompactionService,
Status s = DB::OpenAndCompact(
db_path_, db_path_ + "/" + ROCKSDB_NAMESPACE::ToString(info.job_id),
compaction_input, compaction_service_result, options_override);
if (is_override_wait_result) {
*compaction_service_result = override_wait_result;
if (is_override_wait_result_) {
*compaction_service_result = override_wait_result_;
}
compaction_num_.fetch_add(1);
if (s.ok()) {
Expand All @@ -205,11 +86,32 @@ class MyTestCompactionService : public CompactionService,
}
}

int GetCompactionNum() override { return compaction_num_.load(); }
int GetCompactionNum() { return compaction_num_.load(); }

CompactionServiceJobInfo GetCompactionInfoForStart() { return start_info_; }
CompactionServiceJobInfo GetCompactionInfoForWait() { return wait_info_; }

void OverrideStartStatus(CompactionServiceJobStatus s) {
is_override_start_status_ = true;
override_start_status_ = s;
}

void OverrideWaitStatus(CompactionServiceJobStatus s) {
is_override_wait_status_ = true;
override_wait_status_ = s;
}

void OverrideWaitResult(std::string str) {
is_override_wait_result_ = true;
override_wait_result_ = std::move(str);
}

void ResetOverride() {
is_override_wait_result_ = false;
is_override_start_status_ = false;
is_override_wait_status_ = false;
}

private:
InstrumentedMutex mutex_;
std::atomic_int compaction_num_{0};
Expand All @@ -219,17 +121,17 @@ class MyTestCompactionService : public CompactionService,
std::shared_ptr<Statistics> statistics_;
CompactionServiceJobInfo start_info_;
CompactionServiceJobInfo wait_info_;
bool is_override_start_status_ = false;
CompactionServiceJobStatus override_start_status_ =
CompactionServiceJobStatus::kFailure;
bool is_override_wait_status_ = false;
CompactionServiceJobStatus override_wait_status_ =
CompactionServiceJobStatus::kFailure;
bool is_override_wait_result_ = false;
std::string override_wait_result_;
};

// This is only for listing test classes
enum TestCompactionServiceType {
MyTestCompactionServiceType,
MyTestCompactionServiceLegacyType,
};

class CompactionServiceTest
: public DBTestBase,
public testing::WithParamInterface<TestCompactionServiceType> {
class CompactionServiceTest : public DBTestBase {
public:
explicit CompactionServiceTest()
: DBTestBase("compaction_service_test", true) {}
Expand All @@ -240,19 +142,9 @@ class CompactionServiceTest
primary_statistics_ = CreateDBStatistics();
options->statistics = primary_statistics_;
compactor_statistics_ = CreateDBStatistics();
TestCompactionServiceType cs_type = GetParam();
switch (cs_type) {
case MyTestCompactionServiceType:
compaction_service_ = std::make_shared<MyTestCompactionService>(
dbname_, *options, compactor_statistics_);
break;
case MyTestCompactionServiceLegacyType:
compaction_service_ = std::make_shared<MyTestCompactionServiceLegacy>(
dbname_, *options, compactor_statistics_);
break;
default:
assert(false);
}

compaction_service_ = std::make_shared<MyTestCompactionService>(
dbname_, *options, compactor_statistics_);
options->compaction_service = compaction_service_;
DestroyAndReopen(*options);
}
Expand All @@ -261,9 +153,9 @@ class CompactionServiceTest

Statistics* GetPrimaryStatistics() { return primary_statistics_.get(); }

TestCompactionServiceBase* GetCompactionService() {
MyTestCompactionService* GetCompactionService() {
CompactionService* cs = compaction_service_.get();
return dynamic_cast<TestCompactionServiceBase*>(cs);
return static_cast_with_check<MyTestCompactionService>(cs);
}

void GenerateTestData() {
Expand Down Expand Up @@ -306,7 +198,7 @@ class CompactionServiceTest
std::shared_ptr<CompactionService> compaction_service_;
};

TEST_P(CompactionServiceTest, BasicCompactions) {
TEST_F(CompactionServiceTest, BasicCompactions) {
Options options = CurrentOptions();
ReopenWithCompactionService(&options);

Expand Down Expand Up @@ -393,7 +285,7 @@ TEST_P(CompactionServiceTest, BasicCompactions) {
ASSERT_TRUE(s.IsAborted());
}

TEST_P(CompactionServiceTest, ManualCompaction) {
TEST_F(CompactionServiceTest, ManualCompaction) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
ReopenWithCompactionService(&options);
Expand Down Expand Up @@ -430,7 +322,7 @@ TEST_P(CompactionServiceTest, ManualCompaction) {
VerifyTestData();
}

TEST_P(CompactionServiceTest, FailedToStart) {
TEST_F(CompactionServiceTest, FailedToStart) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
ReopenWithCompactionService(&options);
Expand All @@ -448,7 +340,7 @@ TEST_P(CompactionServiceTest, FailedToStart) {
ASSERT_TRUE(s.IsIncomplete());
}

TEST_P(CompactionServiceTest, InvalidResult) {
TEST_F(CompactionServiceTest, InvalidResult) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
ReopenWithCompactionService(&options);
Expand All @@ -466,7 +358,7 @@ TEST_P(CompactionServiceTest, InvalidResult) {
ASSERT_FALSE(s.ok());
}

TEST_P(CompactionServiceTest, SubCompaction) {
TEST_F(CompactionServiceTest, SubCompaction) {
Options options = CurrentOptions();
options.max_subcompactions = 10;
options.target_file_size_base = 1 << 10; // 1KB
Expand Down Expand Up @@ -505,7 +397,7 @@ class PartialDeleteCompactionFilter : public CompactionFilter {
const char* Name() const override { return "PartialDeleteCompactionFilter"; }
};

TEST_P(CompactionServiceTest, CompactionFilter) {
TEST_F(CompactionServiceTest, CompactionFilter) {
Options options = CurrentOptions();
std::unique_ptr<CompactionFilter> delete_comp_filter(
new PartialDeleteCompactionFilter());
Expand Down Expand Up @@ -546,7 +438,7 @@ TEST_P(CompactionServiceTest, CompactionFilter) {
ASSERT_GE(my_cs->GetCompactionNum(), 1);
}

TEST_P(CompactionServiceTest, Snapshot) {
TEST_F(CompactionServiceTest, Snapshot) {
Options options = CurrentOptions();
ReopenWithCompactionService(&options);

Expand All @@ -567,7 +459,7 @@ TEST_P(CompactionServiceTest, Snapshot) {
db_->ReleaseSnapshot(s1);
}

TEST_P(CompactionServiceTest, ConcurrentCompaction) {
TEST_F(CompactionServiceTest, ConcurrentCompaction) {
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 100;
options.max_background_jobs = 20;
Expand All @@ -579,7 +471,7 @@ TEST_P(CompactionServiceTest, ConcurrentCompaction) {

std::vector<std::thread> threads;
for (const auto& file : meta.levels[1].files) {
threads.push_back(std::thread([&]() {
threads.emplace_back(std::thread([&]() {
std::string fname = file.db_path + "/" + file.name;
ASSERT_OK(db_->CompactFiles(CompactionOptions(), {fname}, 2));
}));
Expand All @@ -604,12 +496,7 @@ TEST_P(CompactionServiceTest, ConcurrentCompaction) {
ASSERT_EQ(FilesPerLevel(), "0,0,10");
}

TEST_P(CompactionServiceTest, CompactionInfo) {
// only test compaction info for new compaction service interface
if (GetParam() != MyTestCompactionServiceType) {
return;
}

TEST_F(CompactionServiceTest, CompactionInfo) {
Options options = CurrentOptions();
ReopenWithCompactionService(&options);

Expand Down Expand Up @@ -688,7 +575,7 @@ TEST_P(CompactionServiceTest, CompactionInfo) {
ASSERT_EQ(Env::BOTTOM, info.priority);
}

TEST_P(CompactionServiceTest, FallbackLocalAuto) {
TEST_F(CompactionServiceTest, FallbackLocalAuto) {
Options options = CurrentOptions();
ReopenWithCompactionService(&options);

Expand Down Expand Up @@ -740,7 +627,7 @@ TEST_P(CompactionServiceTest, FallbackLocalAuto) {
ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES), 0);
}

TEST_P(CompactionServiceTest, FallbackLocalManual) {
TEST_F(CompactionServiceTest, FallbackLocalManual) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
ReopenWithCompactionService(&options);
Expand Down Expand Up @@ -798,12 +685,6 @@ TEST_P(CompactionServiceTest, FallbackLocalManual) {
VerifyTestData();
}

INSTANTIATE_TEST_CASE_P(
CompactionServiceTest, CompactionServiceTest,
::testing::Values(
TestCompactionServiceType::MyTestCompactionServiceType,
TestCompactionServiceType::MyTestCompactionServiceLegacyType));

} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down
Loading

0 comments on commit 31031c0

Please sign in to comment.