Skip to content

Commit

Permalink
CompactionJobTest
Browse files Browse the repository at this point in the history
Summary:
This is just a simple test that passes two files though a compaction. It shows the framework so that people can continue building new compaction *unit* tests.
In the future we might want to move some Compaction* tests from DBTest here. For example, CompactBetweenSnapshot seems a good candidate.

Hopefully this test can be simpler when we mock out VersionSet.

Test Plan: this is a test

Reviewers: ljin, rven, yhchiang, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D28449
  • Loading branch information
igorcanadi committed Nov 14, 2014
1 parent 7fe2470 commit 9be338c
Show file tree
Hide file tree
Showing 8 changed files with 247 additions and 17 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ TESTS = \
flush_job_test \
wal_manager_test \
listener_test \
write_batch_with_index_test
compaction_job_test

TOOLS = \
sst_dump \
Expand Down Expand Up @@ -425,6 +425,9 @@ write_batch_with_index_test: utilities/write_batch_with_index/write_batch_with_i
flush_job_test: db/flush_job_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/flush_job_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)

compaction_job_test: db/compaction_job_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/compaction_job_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)

wal_manager_test: db/wal_manager_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/wal_manager_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)

Expand Down
11 changes: 11 additions & 0 deletions db/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -328,4 +328,15 @@ uint64_t Compaction::OutputFilePreallocationSize(
return preallocation_size * 1.1;
}

Compaction* Compaction::TEST_NewCompaction(
int num_levels, int start_level, int out_level, uint64_t target_file_size,
uint64_t max_grandparent_overlap_bytes, uint32_t output_path_id,
CompressionType output_compression, bool seek_compaction,
bool deletion_compaction) {
return new Compaction(num_levels, start_level, out_level, target_file_size,
max_grandparent_overlap_bytes, output_path_id,
output_compression, seek_compaction,
deletion_compaction);
}

} // namespace rocksdb
10 changes: 10 additions & 0 deletions db/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,16 @@ class Compaction {
void SetupBottomMostLevel(VersionStorageInfo* vstorage, bool is_manual,
bool level0_only);

static Compaction* TEST_NewCompaction(
int num_levels, int start_level, int out_level, uint64_t target_file_size,
uint64_t max_grandparent_overlap_bytes, uint32_t output_path_id,
CompressionType output_compression, bool seek_compaction = false,
bool deletion_compaction = false);

CompactionInputFiles* TEST_GetInputFiles(int level) {
return &inputs_[level];
}

private:
friend class CompactionPicker;
friend class UniversalCompactionPicker;
Expand Down
2 changes: 2 additions & 0 deletions db/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ void CompactionJob::Prepare() {
// Generate file_levels_ for compaction berfore making Iterator
compact_->compaction->GenerateFileLevels();
ColumnFamilyData* cfd = compact_->compaction->column_family_data();
assert(cfd != nullptr);
LogToBuffer(
log_buffer_, "[%s] Compacting %d@%d + %d@%d files, score %.2f",
cfd->GetName().c_str(), compact_->compaction->num_input_files(0),
Expand Down Expand Up @@ -990,6 +991,7 @@ Status CompactionJob::InstallCompactionResults(port::Mutex* db_mutex) {
inline SequenceNumber CompactionJob::findEarliestVisibleSnapshot(
SequenceNumber in, const std::vector<SequenceNumber>& snapshots,
SequenceNumber* prev_snapshot) {
assert(snapshots.size());
SequenceNumber prev __attribute__((unused)) = 0;
for (const auto cur : snapshots) {
assert(prev <= cur);
Expand Down
176 changes: 176 additions & 0 deletions db/compaction_job_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#include <map>
#include <string>

#include "db/compaction_job.h"
#include "db/column_family.h"
#include "db/version_set.h"
#include "rocksdb/cache.h"
#include "rocksdb/options.h"
#include "rocksdb/db.h"
#include "util/testharness.h"
#include "util/testutil.h"
#include "table/mock_table.h"

namespace rocksdb {

// TODO(icanadi) Make it simpler once we mock out VersionSet
class CompactionJobTest {
public:
CompactionJobTest()
: env_(Env::Default()),
dbname_(test::TmpDir() + "/compaction_job_test"),
table_cache_(NewLRUCache(50000, 16, 8)),
versions_(new VersionSet(dbname_, &db_options_, env_options_,
table_cache_.get(), &write_controller_)),
shutting_down_(false),
mock_table_factory_(new mock::MockTableFactory()) {
ASSERT_OK(env_->CreateDirIfMissing(dbname_));
db_options_.db_paths.emplace_back(dbname_,
std::numeric_limits<uint64_t>::max());
NewDB();
std::vector<ColumnFamilyDescriptor> column_families;
cf_options_.table_factory = mock_table_factory_;
column_families.emplace_back(kDefaultColumnFamilyName, cf_options_);

mutable_cf_options_.RefreshDerivedOptions(ImmutableCFOptions(Options()));

ASSERT_OK(versions_->Recover(column_families, false));
}

std::string GenerateFileName(uint64_t file_number) {
FileMetaData meta;
std::vector<DbPath> db_paths;
db_paths.emplace_back(dbname_, std::numeric_limits<uint64_t>::max());
meta.fd = FileDescriptor(file_number, 0, 0);
return TableFileName(db_paths, meta.fd.GetNumber(), meta.fd.GetPathId());
}

// returns expected result after compaction
mock::MockFileContents CreateTwoFiles() {
mock::MockFileContents expected_results;
const int kKeysPerFile = 10000;
SequenceNumber sequence_number = 0;
for (int i = 0; i < 2; ++i) {
mock::MockFileContents contents;
SequenceNumber smallest_seqno, largest_seqno;
InternalKey smallest, largest;
for (int k = 0; k < kKeysPerFile; ++k) {
auto key = std::to_string(i * (kKeysPerFile / 2) + k);
auto value = std::to_string(i * kKeysPerFile + k);
InternalKey internal_key(key, ++sequence_number, kTypeValue);
if (k == 0) {
smallest = internal_key;
smallest_seqno = sequence_number;
} else if (k == kKeysPerFile - 1) {
largest = internal_key;
largest_seqno = sequence_number;
}
std::pair<std::string, std::string> key_value(
{internal_key.Encode().ToString(), value});
contents.insert(key_value);
if (i == 1 || k < kKeysPerFile / 2) {
expected_results.insert(key_value);
}
}

uint64_t file_number = versions_->NewFileNumber();
ASSERT_OK(mock_table_factory_->CreateMockTable(
env_, GenerateFileName(file_number), std::move(contents)));

VersionEdit edit;
edit.AddFile(0, file_number, 0, 10, smallest, largest, smallest_seqno,
largest_seqno);

mutex_.Lock();
versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),
mutable_cf_options_, &edit, &mutex_);
mutex_.Unlock();
}
versions_->SetLastSequence(sequence_number);
return expected_results;
}

void NewDB() {
VersionEdit new_db;
new_db.SetLogNumber(0);
new_db.SetNextFile(2);
new_db.SetLastSequence(0);

const std::string manifest = DescriptorFileName(dbname_, 1);
unique_ptr<WritableFile> file;
Status s = env_->NewWritableFile(
manifest, &file, env_->OptimizeForManifestWrite(env_options_));
ASSERT_OK(s);
{
log::Writer log(std::move(file));
std::string record;
new_db.EncodeTo(&record);
s = log.AddRecord(record);
}
ASSERT_OK(s);
// Make "CURRENT" file that points to the new manifest file.
s = SetCurrentFile(env_, dbname_, 1, nullptr);
}

Env* env_;
std::string dbname_;
EnvOptions env_options_;
MutableCFOptions mutable_cf_options_;
std::shared_ptr<Cache> table_cache_;
WriteController write_controller_;
DBOptions db_options_;
ColumnFamilyOptions cf_options_;
std::unique_ptr<VersionSet> versions_;
port::Mutex mutex_;
std::atomic<bool> shutting_down_;
std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
};

TEST(CompactionJobTest, Simple) {
auto cfd = versions_->GetColumnFamilySet()->GetDefault();

auto expected_results = CreateTwoFiles();

auto files = cfd->current()->storage_info()->LevelFiles(0);
ASSERT_EQ(2U, files.size());

std::unique_ptr<Compaction> compaction(Compaction::TEST_NewCompaction(
7, 0, 1, 1024 * 1024, 10, 0, kNoCompression));
compaction->SetInputVersion(cfd->current());

auto compaction_input_files = compaction->TEST_GetInputFiles(0);
compaction_input_files->level = 0;
compaction_input_files->files.push_back(files[0]);
compaction_input_files->files.push_back(files[1]);

SnapshotList snapshots;
int yield_callback_called = 0;
std::function<uint64_t()> yield_callback = [&]() {
yield_callback_called++;
return 0;
};
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
mutex_.Lock();
CompactionJob compaction_job(
compaction.get(), db_options_, *cfd->GetLatestMutableCFOptions(),
env_options_, versions_.get(), &shutting_down_, &log_buffer, nullptr,
nullptr, &snapshots, true, table_cache_, std::move(yield_callback));
compaction_job.Prepare();
mutex_.Unlock();
ASSERT_OK(compaction_job.Run());
mutex_.Lock();
compaction_job.Install(Status::OK(), &mutex_);
mutex_.Unlock();

mock_table_factory_->AssertLatestFile(expected_results);
ASSERT_EQ(yield_callback_called, 20000);
}

} // namespace rocksdb

int main(int argc, char** argv) { return rocksdb::test::RunAllTests(); }
4 changes: 2 additions & 2 deletions db/flush_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class FlushJobTest {
versions_(new VersionSet(dbname_, &db_options_, env_options_,
table_cache_.get(), &write_controller_)),
shutting_down_(false),
mock_table_factory_(new MockTableFactory()) {
mock_table_factory_(new mock::MockTableFactory()) {
ASSERT_OK(env_->CreateDirIfMissing(dbname_));
db_options_.db_paths.emplace_back(dbname_,
std::numeric_limits<uint64_t>::max());
Expand Down Expand Up @@ -73,7 +73,7 @@ class FlushJobTest {
std::unique_ptr<VersionSet> versions_;
port::Mutex mutex_;
std::atomic<bool> shutting_down_;
std::shared_ptr<MockTableFactory> mock_table_factory_;
std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
};

TEST(FlushJobTest, Empty) {
Expand Down
25 changes: 23 additions & 2 deletions table/mock_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "util/coding.h"

namespace rocksdb {
namespace mock {

Iterator* MockTableReader::NewIterator(const ReadOptions&, Arena* arena) {
return new MockTableIterator(table_);
Expand Down Expand Up @@ -70,6 +71,19 @@ TableBuilder* MockTableFactory::NewTableBuilder(
return new MockTableBuilder(id, &file_system_);
}

Status MockTableFactory::CreateMockTable(Env* env, const std::string& fname,
MockFileContents file_contents) {
std::unique_ptr<WritableFile> file;
auto s = env->NewWritableFile(fname, &file, EnvOptions());
if (!s.ok()) {
return s;
}

uint32_t id = GetAndWriteNextID(file.get());
file_system_.files.insert({id, std::move(file_contents)});
return Status::OK();
}

uint32_t MockTableFactory::GetAndWriteNextID(WritableFile* file) const {
uint32_t next_id = next_id_.fetch_add(1);
char buf[4];
Expand All @@ -86,10 +100,17 @@ uint32_t MockTableFactory::GetIDFromFile(RandomAccessFile* file) const {
return DecodeFixed32(buf);
}

void MockTableFactory::AssertSingleFile(
const std::map<std::string, std::string>& file_contents) {
void MockTableFactory::AssertSingleFile(const MockFileContents& file_contents) {
ASSERT_EQ(file_system_.files.size(), 1U);
ASSERT_TRUE(file_contents == file_system_.files.begin()->second);
}

void MockTableFactory::AssertLatestFile(const MockFileContents& file_contents) {
ASSERT_GE(file_system_.files.size(), 1U);
auto latest = file_system_.files.end();
--latest;
ASSERT_TRUE(file_contents == latest->second);
}

} // namespace mock
} // namespace rocksdb
Loading

0 comments on commit 9be338c

Please sign in to comment.