Skip to content

Commit

Permalink
Support ingest_behind for IngestExternalFile
Browse files Browse the repository at this point in the history
Summary:
First cut for early review; there are few conceptual points to answer and some code structure issues.

For conceptual points -

 - restriction-wise, we're going to disallow ingest_behind if (use_seqno_zero_out=true || disable_auto_compaction=false), the user is responsible to properly open and close DB with required params
 - we wanted to ingest into reserved bottom most level. Should we fail fast if bottom level isn't empty, or should we attempt to ingest if file fits there key-ranges-wise?
 - Modifying AssignLevelForIngestedFile seems the place we we'd handle that.

On code structure - going to refactor GenerateAndAddExternalFile call in the test class to allow passing instance of IngestionOptions, that's just going to incur lots of changes at callsites.
Closes facebook#2144

Differential Revision: D4873732

Pulled By: lightmark

fbshipit-source-id: 81cb698106b68ef8797f564453651d50900e153a
  • Loading branch information
mikhail-antonov authored and facebook-github-bot committed May 17, 2017
1 parent 01ab7b5 commit ba685a4
Show file tree
Hide file tree
Showing 22 changed files with 325 additions and 33 deletions.
6 changes: 6 additions & 0 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,12 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
result.num_levels < 2) {
result.num_levels = 2;
}

if (result.compaction_style == kCompactionStyleUniversal &&
db_options.allow_ingest_behind && result.num_levels < 3) {
result.num_levels = 3;
}

if (result.max_write_buffer_number < 2) {
result.max_write_buffer_number = 2;
}
Expand Down
3 changes: 2 additions & 1 deletion db/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,8 @@ void CompactionIterator::PrepareOutput() {

// This is safe for TransactionDB write-conflict checking since transactions
// only care about sequence number larger than any active snapshots.
if (bottommost_level_ && valid_ && ikey_.sequence <= earliest_snapshot_ &&
if ((compaction_ != nullptr && !compaction_->allow_ingest_behind()) &&
bottommost_level_ && valid_ && ikey_.sequence <= earliest_snapshot_ &&
ikey_.type != kTypeMerge &&
!cmp_->Equal(compaction_->GetLargestUserKey(), ikey_.user_key)) {
assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion);
Expand Down
4 changes: 4 additions & 0 deletions db/compaction_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "db/merge_helper.h"
#include "db/pinned_iterators_manager.h"
#include "db/range_del_aggregator.h"
#include "options/cf_options.h"
#include "rocksdb/compaction_filter.h"

namespace rocksdb {
Expand Down Expand Up @@ -48,6 +49,9 @@ class CompactionIterator {
virtual Slice GetLargestUserKey() const {
return compaction_->GetLargestUserKey();
}
virtual bool allow_ingest_behind() const {
return compaction_->immutable_cf_options()->allow_ingest_behind;
}

protected:
CompactionProxy() = default;
Expand Down
1 change: 1 addition & 0 deletions db/compaction_iterator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ class FakeCompaction : public CompactionIterator::CompactionProxy {
virtual Slice GetLargestUserKey() const {
return "\xff\xff\xff\xff\xff\xff\xff\xff\xff";
}
virtual bool allow_ingest_behind() const { return false; }

bool key_not_exists_beyond_output_level = false;
};
Expand Down
6 changes: 5 additions & 1 deletion db/compaction_picker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,11 @@ Compaction* CompactionPicker::CompactRange(
// files together to the last level.
assert(vstorage->num_levels() > 1);
// DBImpl::CompactRange() set output level to be the last level
assert(output_level == vstorage->num_levels() - 1);
if (ioptions_.allow_ingest_behind) {
assert(output_level == vstorage->num_levels() - 2);
} else {
assert(output_level == vstorage->num_levels() - 1);
}
// DBImpl::RunManualCompaction will make full range for universal compaction
assert(begin == nullptr);
assert(end == nullptr);
Expand Down
29 changes: 29 additions & 0 deletions db/compaction_picker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,35 @@ TEST_F(CompactionPickerTest, NeedsCompactionUniversal) {
vstorage_->CompactionScore(0) >= 1);
}
}

TEST_F(CompactionPickerTest, CompactionUniversalIngestBehindReservedLevel) {
const uint64_t kFileSize = 100000;
NewVersionStorage(1, kCompactionStyleUniversal);
ioptions_.allow_ingest_behind = true;
ioptions_.num_levels = 3;
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
// must return false when there's no files.
ASSERT_EQ(universal_compaction_picker.NeedsCompaction(vstorage_.get()),
false);

NewVersionStorage(3, kCompactionStyleUniversal);

Add(0, 1U, "150", "200", kFileSize, 0, 500, 550);
Add(0, 2U, "201", "250", kFileSize, 0, 401, 450);
Add(0, 4U, "260", "300", kFileSize, 0, 260, 300);
Add(1, 5U, "100", "151", kFileSize, 0, 200, 251);
Add(1, 3U, "301", "350", kFileSize, 0, 101, 150);
Add(2, 6U, "120", "200", kFileSize, 0, 20, 100);

UpdateVersionStorageInfo();

std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));

// output level should be the one above the bottom-most
ASSERT_EQ(1, compaction->output_level());
}
// Tests if the files can be trivially moved in multi level
// universal compaction when allow_trivial_move option is set
// In this test as the input files overlaps, they cannot
Expand Down
20 changes: 17 additions & 3 deletions db/compaction_picker_universal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,13 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSortedRuns(
output_level = sorted_runs[first_index_after].level - 1;
}

// last level is reserved for the files ingested behind
if (ioptions_.allow_ingest_behind &&
(output_level == vstorage->num_levels() - 1)) {
assert(output_level > 1);
output_level--;
}

std::vector<CompactionInputFiles> inputs(vstorage->num_levels());
for (size_t i = 0; i < inputs.size(); ++i) {
inputs[i].level = start_level + static_cast<int>(i);
Expand Down Expand Up @@ -719,13 +726,20 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSizeAmp(
cf_name.c_str(), file_num_buf);
}

// output files at the bottom most level, unless it's reserved
int output_level = vstorage->num_levels() - 1;
// last level is reserved for the files ingested behind
if (ioptions_.allow_ingest_behind) {
assert(output_level > 1);
output_level--;
}

return new Compaction(
vstorage, ioptions_, mutable_cf_options, std::move(inputs),
vstorage->num_levels() - 1,
mutable_cf_options.MaxFileSizeForLevel(vstorage->num_levels() - 1),
output_level, mutable_cf_options.MaxFileSizeForLevel(output_level),
/* max_grandparent_overlap_bytes */ LLONG_MAX, path_id,
GetCompressionType(ioptions_, vstorage, mutable_cf_options,
vstorage->num_levels() - 1, 1),
output_level, 1),
/* grandparents */ {}, /* is manual */ false, score,
false /* deletion_compaction */,
CompactionReason::kUniversalSizeAmplification);
Expand Down
9 changes: 9 additions & 0 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2614,6 +2614,15 @@ Status DBImpl::IngestExternalFile(
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();

// Ingest should immediately fail if ingest_behind is requested,
// but the DB doesn't support it.
if (ingestion_options.ingest_behind) {
if (!immutable_db_options_.allow_ingest_behind) {
return Status::InvalidArgument(
"Can't ingest_behind file in DB with allow_ingest_behind=false");
}
}

ExternalSstFileIngestionJob ingestion_job(env_, versions_.get(), cfd,
immutable_db_options_, env_options_,
&snapshots_, ingestion_options);
Expand Down
8 changes: 6 additions & 2 deletions db/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,14 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal &&
cfd->NumberLevels() > 1) {
// Always compact all files together.
final_output_level = cfd->NumberLevels() - 1;
// if bottom most level is reserved
if (immutable_db_options_.allow_ingest_behind) {
final_output_level--;
}
s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels,
cfd->NumberLevels() - 1, options.target_path_id,
final_output_level, options.target_path_id,
begin, end, exclusive);
final_output_level = cfd->NumberLevels() - 1;
} else {
for (int level = 0; level <= max_level_with_files; level++) {
int output_level;
Expand Down
76 changes: 58 additions & 18 deletions db/external_sst_file_ingestion_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,13 @@ Status ExternalSstFileIngestionJob::Run() {

for (IngestedFileInfo& f : files_to_ingest_) {
SequenceNumber assigned_seqno = 0;
status = AssignLevelAndSeqnoForIngestedFile(
super_version, force_global_seqno, cfd_->ioptions()->compaction_style,
&f, &assigned_seqno);
if (ingestion_options_.ingest_behind) {
status = CheckLevelForIngestedBehindFile(&f);
} else {
status = AssignLevelAndSeqnoForIngestedFile(
super_version, force_global_seqno, cfd_->ioptions()->compaction_style,
&f, &assigned_seqno);
}
if (!status.ok()) {
return status;
}
Expand Down Expand Up @@ -408,30 +412,18 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
Arena arena;
ReadOptions ro;
ro.total_order_seek = true;

int target_level = 0;
auto* vstorage = cfd_->current()->storage_info();

for (int lvl = 0; lvl < cfd_->NumberLevels(); lvl++) {
if (lvl > 0 && lvl < vstorage->base_level()) {
continue;
}

if (vstorage->NumLevelFiles(lvl) > 0) {
bool overlap_with_level = false;
MergeIteratorBuilder merge_iter_builder(&cfd_->internal_comparator(),
&arena);
RangeDelAggregator range_del_agg(cfd_->internal_comparator(),
{} /* snapshots */);
sv->current->AddIteratorsForLevel(ro, env_options_, &merge_iter_builder,
lvl, &range_del_agg);
if (!range_del_agg.IsEmpty()) {
return Status::NotSupported(
"file ingestion with range tombstones is currently unsupported");
}
ScopedArenaIterator level_iter(merge_iter_builder.Finish());

status = IngestedFileOverlapWithIteratorRange(
file_to_ingest, level_iter.get(), &overlap_with_level);
status = IngestedFileOverlapWithLevel(sv, file_to_ingest, lvl,
&overlap_with_level);
if (!status.ok()) {
return status;
}
Expand Down Expand Up @@ -478,6 +470,33 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
return status;
}

Status ExternalSstFileIngestionJob::CheckLevelForIngestedBehindFile(
IngestedFileInfo* file_to_ingest) {
auto* vstorage = cfd_->current()->storage_info();
// first check if new files fit in the bottommost level
int bottom_lvl = cfd_->NumberLevels() - 1;
if(!IngestedFileFitInLevel(file_to_ingest, bottom_lvl)) {
return Status::InvalidArgument(
"Can't ingest_behind file as it doesn't fit "
"at the bottommost level!");
}

// second check if despite allow_ingest_behind=true we still have 0 seqnums
// at some upper level
for (int lvl = 0; lvl < cfd_->NumberLevels() - 1; lvl++) {
for (auto file : vstorage->LevelFiles(lvl)) {
if (file->smallest_seqno == 0) {
return Status::InvalidArgument(
"Can't ingest_behind file as despite allow_ingest_behind=true "
"there are files with 0 seqno in database at upper levels!");
}
}
}

file_to_ingest->picked_level = bottom_lvl;
return Status::OK();
}

Status ExternalSstFileIngestionJob::AssignGlobalSeqnoForIngestedFile(
IngestedFileInfo* file_to_ingest, SequenceNumber seqno) {
if (file_to_ingest->original_seqno == seqno) {
Expand Down Expand Up @@ -564,6 +583,27 @@ bool ExternalSstFileIngestionJob::IngestedFileFitInLevel(
return true;
}

Status ExternalSstFileIngestionJob::IngestedFileOverlapWithLevel(
SuperVersion* sv, IngestedFileInfo* file_to_ingest, int lvl,
bool* overlap_with_level) {
Arena arena;
ReadOptions ro;
ro.total_order_seek = true;
MergeIteratorBuilder merge_iter_builder(&cfd_->internal_comparator(),
&arena);
RangeDelAggregator range_del_agg(cfd_->internal_comparator(),
{} /* snapshots */);
sv->current->AddIteratorsForLevel(ro, env_options_, &merge_iter_builder,
lvl, &range_del_agg);
if (!range_del_agg.IsEmpty()) {
return Status::NotSupported(
"file ingestion with range tombstones is currently unsupported");
}
ScopedArenaIterator level_iter(merge_iter_builder.Finish());
return IngestedFileOverlapWithIteratorRange(
file_to_ingest, level_iter.get(), overlap_with_level);
}

} // namespace rocksdb

#endif // !ROCKSDB_LITE
11 changes: 11 additions & 0 deletions db/external_sst_file_ingestion_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ class ExternalSstFileIngestionJob {
IngestedFileInfo* file_to_ingest,
SequenceNumber* assigned_seqno);

// File that we want to ingest behind always goes to the lowest level;
// we just check that it fits in the level, that DB allows ingest_behind,
// and that we don't have 0 seqnums at the upper levels.
// REQUIRES: Mutex held
Status CheckLevelForIngestedBehindFile(IngestedFileInfo* file_to_ingest);

// Set the file global sequence number to `seqno`
Status AssignGlobalSeqnoForIngestedFile(IngestedFileInfo* file_to_ingest,
SequenceNumber seqno);
Expand All @@ -135,6 +141,11 @@ class ExternalSstFileIngestionJob {
const IngestedFileInfo* file_to_ingest, InternalIterator* iter,
bool* overlap);

// Check if `file_to_ingest` key range overlap with level
// REQUIRES: Mutex held
Status IngestedFileOverlapWithLevel(SuperVersion* sv,
IngestedFileInfo* file_to_ingest, int lvl, bool* overlap_with_level);

// Check if `file_to_ingest` can fit in level `level`
// REQUIRES: Mutex held
bool IngestedFileFitInLevel(const IngestedFileInfo* file_to_ingest,
Expand Down
Loading

0 comments on commit ba685a4

Please sign in to comment.