Skip to content

Commit

Permalink
BlobDB GC: add SST <-> oldest blob file referenced mapping (facebook#…
Browse files Browse the repository at this point in the history
…5903)

Summary:
This is groundwork for adding garbage collection support to BlobDB. The
patch adds logic that keeps track of the oldest blob file referred to by
each SST file. The oldest blob file is identified during flush/
compaction (similarly to how the range of keys covered by the SST is
identified), and persisted in the manifest as a custom field of the new
file edit record. Blob indexes with TTL are ignored for the purposes of
identifying the oldest blob file (since such blob files are cleaned up by the
TTL logic in BlobDB).
Pull Request resolved: facebook#5903

Test Plan:
Added new unit tests; also ran db_bench in BlobDB mode, inspected the
manifest using ldb, and confirmed (by scanning the SST files using
sst_dump) that the value of the oldest blob file number field matches
the contents of the file for each SST.

Differential Revision: D17859997

Pulled By: ltamasi

fbshipit-source-id: 21662c137c6259a6af70446faaf3a9912c550e90
  • Loading branch information
ltamasi authored and facebook-github-bot committed Oct 14, 2019
1 parent a59dc84 commit 5f025ea
Show file tree
Hide file tree
Showing 21 changed files with 382 additions and 161 deletions.
2 changes: 0 additions & 2 deletions db/blob_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
#include "util/string_util.h"

namespace rocksdb {
namespace blob_db {

// BlobIndex is a pointer to the blob and metadata of the blob. The index is
// stored in base DB as ValueType::kTypeBlobIndex.
Expand Down Expand Up @@ -156,6 +155,5 @@ class BlobIndex {
CompressionType compression_ = kNoCompression;
};

} // namespace blob_db
} // namespace rocksdb
#endif // ROCKSDB_LITE
7 changes: 4 additions & 3 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ Status BuildTable(
if (!s.ok()) {
EventHelpers::LogAndNotifyTableFileCreationFinished(
event_logger, ioptions.listeners, dbname, column_family_name, fname,
job_id, meta->fd, tp, reason, s);
job_id, meta->fd, kInvalidBlobFileNumber, tp, reason, s);
return s;
}
file->SetIOPriority(io_priority);
Expand Down Expand Up @@ -157,8 +157,9 @@ Status BuildTable(
for (; c_iter.Valid(); c_iter.Next()) {
const Slice& key = c_iter.key();
const Slice& value = c_iter.value();
const ParsedInternalKey& ikey = c_iter.ikey();
builder->Add(key, value);
meta->UpdateBoundaries(key, c_iter.ikey().sequence);
meta->UpdateBoundaries(key, value, ikey.sequence, ikey.type);

// TODO(noetzli): Update stats after flush, too.
if (io_priority == Env::IO_HIGH &&
Expand Down Expand Up @@ -249,7 +250,7 @@ Status BuildTable(
// Output to event logger and fire events.
EventHelpers::LogAndNotifyTableFileCreationFinished(
event_logger, ioptions.listeners, dbname, column_family_name, fname,
job_id, meta->fd, tp, reason, s);
job_id, meta->fd, meta->oldest_blob_file_number, tp, reason, s);

return s;
}
Expand Down
12 changes: 8 additions & 4 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -933,8 +933,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
assert(sub_compact->current_output() != nullptr);
sub_compact->builder->Add(key, value);
sub_compact->current_output_file_size = sub_compact->builder->FileSize();
const ParsedInternalKey& ikey = c_iter->ikey();
sub_compact->current_output()->meta.UpdateBoundaries(
key, c_iter->ikey().sequence);
key, value, ikey.sequence, ikey.type);
sub_compact->num_output_records++;

// Close output file if it is big enough. Two possibilities determine it's
Expand Down Expand Up @@ -1349,17 +1350,20 @@ Status CompactionJob::FinishCompactionOutputFile(
}
std::string fname;
FileDescriptor output_fd;
uint64_t oldest_blob_file_number = kInvalidBlobFileNumber;
if (meta != nullptr) {
fname =
TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths,
meta->fd.GetNumber(), meta->fd.GetPathId());
output_fd = meta->fd;
oldest_blob_file_number = meta->oldest_blob_file_number;
} else {
fname = "(nil)";
}
EventHelpers::LogAndNotifyTableFileCreationFinished(
event_logger_, cfd->ioptions()->listeners, dbname_, cfd->GetName(), fname,
job_id_, output_fd, tp, TableFileCreationReason::kCompaction, s);
job_id_, output_fd, oldest_blob_file_number, tp,
TableFileCreationReason::kCompaction, s);

#ifndef ROCKSDB_LITE
// Report new file to SstFileManagerImpl
Expand Down Expand Up @@ -1469,8 +1473,8 @@ Status CompactionJob::OpenCompactionOutputFile(
LogFlush(db_options_.info_log);
EventHelpers::LogAndNotifyTableFileCreationFinished(
event_logger_, cfd->ioptions()->listeners, dbname_, cfd->GetName(),
fname, job_id_, FileDescriptor(), TableProperties(),
TableFileCreationReason::kCompaction, s);
fname, job_id_, FileDescriptor(), kInvalidBlobFileNumber,
TableProperties(), TableFileCreationReason::kCompaction, s);
return s;
}

Expand Down
115 changes: 106 additions & 9 deletions db/compaction/compaction_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <string>
#include <tuple>

#include "db/blob_index.h"
#include "db/column_family.h"
#include "db/compaction/compaction_job.h"
#include "db/db_impl/db_impl.h"
Expand Down Expand Up @@ -97,11 +98,34 @@ class CompactionJobTest : public testing::Test {
return TableFileName(db_paths, meta.fd.GetNumber(), meta.fd.GetPathId());
}

std::string KeyStr(const std::string& user_key, const SequenceNumber seq_num,
const ValueType t) {
static std::string KeyStr(const std::string& user_key,
const SequenceNumber seq_num, const ValueType t) {
return InternalKey(user_key, seq_num, t).Encode().ToString();
}

static std::string BlobStr(uint64_t blob_file_number, uint64_t offset,
uint64_t size) {
std::string blob_index;
BlobIndex::EncodeBlob(&blob_index, blob_file_number, offset, size,
kNoCompression);
return blob_index;
}

static std::string BlobStrTTL(uint64_t blob_file_number, uint64_t offset,
uint64_t size, uint64_t expiration) {
std::string blob_index;
BlobIndex::EncodeBlobTTL(&blob_index, expiration, blob_file_number, offset,
size, kNoCompression);
return blob_index;
}

static std::string BlobStrInlinedTTL(const Slice& value,
uint64_t expiration) {
std::string blob_index;
BlobIndex::EncodeInlinedTTL(&blob_index, expiration, value);
return blob_index;
}

void AddMockFile(const stl_wrappers::KVMap& contents, int level = 0) {
assert(contents.size() > 0);

Expand All @@ -110,6 +134,7 @@ class CompactionJobTest : public testing::Test {
InternalKey smallest_key, largest_key;
SequenceNumber smallest_seqno = kMaxSequenceNumber;
SequenceNumber largest_seqno = 0;
uint64_t oldest_blob_file_number = kInvalidBlobFileNumber;
for (auto kv : contents) {
ParsedInternalKey key;
std::string skey;
Expand All @@ -132,6 +157,24 @@ class CompactionJobTest : public testing::Test {
}

first_key = false;

if (key.type == kTypeBlobIndex) {
BlobIndex blob_index;
const Status s = blob_index.DecodeFrom(value);
if (!s.ok()) {
continue;
}

if (blob_index.IsInlined() || blob_index.HasTTL() ||
blob_index.file_number() == kInvalidBlobFileNumber) {
continue;
}

if (oldest_blob_file_number == kInvalidBlobFileNumber ||
oldest_blob_file_number > blob_index.file_number()) {
oldest_blob_file_number = blob_index.file_number();
}
}
}

uint64_t file_number = versions_->NewFileNumber();
Expand All @@ -140,7 +183,7 @@ class CompactionJobTest : public testing::Test {

VersionEdit edit;
edit.AddFile(level, file_number, 0, 10, smallest_key, largest_key,
smallest_seqno, largest_seqno, false);
smallest_seqno, largest_seqno, false, oldest_blob_file_number);

mutex_.Lock();
versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),
Expand Down Expand Up @@ -250,7 +293,8 @@ class CompactionJobTest : public testing::Test {
const stl_wrappers::KVMap& expected_results,
const std::vector<SequenceNumber>& snapshots = {},
SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber,
int output_level = 1, bool verify = true) {
int output_level = 1, bool verify = true,
uint64_t expected_oldest_blob_file_number = kInvalidBlobFileNumber) {
auto cfd = versions_->GetColumnFamilySet()->GetDefault();

size_t num_input_files = 0;
Expand Down Expand Up @@ -296,15 +340,20 @@ class CompactionJobTest : public testing::Test {
mutex_.Unlock();

if (verify) {
if (expected_results.size() == 0) {
ASSERT_GE(compaction_job_stats_.elapsed_micros, 0U);
ASSERT_EQ(compaction_job_stats_.num_input_files, num_input_files);
ASSERT_GE(compaction_job_stats_.elapsed_micros, 0U);
ASSERT_EQ(compaction_job_stats_.num_input_files, num_input_files);

if (expected_results.empty()) {
ASSERT_EQ(compaction_job_stats_.num_output_files, 0U);
} else {
ASSERT_GE(compaction_job_stats_.elapsed_micros, 0U);
ASSERT_EQ(compaction_job_stats_.num_input_files, num_input_files);
ASSERT_EQ(compaction_job_stats_.num_output_files, 1U);
mock_table_factory_->AssertLatestFile(expected_results);

auto output_files =
cfd->current()->storage_info()->LevelFiles(output_level);
ASSERT_EQ(output_files.size(), 1);
ASSERT_EQ(output_files[0]->oldest_blob_file_number,
expected_oldest_blob_file_number);
}
}
}
Expand Down Expand Up @@ -960,6 +1009,54 @@ TEST_F(CompactionJobTest, CorruptionAfterDeletion) {
RunCompaction({files}, expected_results);
}

TEST_F(CompactionJobTest, OldestBlobFileNumber) {
NewDB();

// Note: blob1 is inlined TTL, so it will not be considered for the purposes
// of identifying the oldest referenced blob file. Similarly, blob6 will be
// ignored because it has TTL and hence refers to a TTL blob file.
const stl_wrappers::KVMap::value_type blob1(
KeyStr("a", 1U, kTypeBlobIndex), BlobStrInlinedTTL("foo", 1234567890ULL));
const stl_wrappers::KVMap::value_type blob2(KeyStr("b", 2U, kTypeBlobIndex),
BlobStr(59, 123456, 999));
const stl_wrappers::KVMap::value_type blob3(KeyStr("c", 3U, kTypeBlobIndex),
BlobStr(138, 1000, 1 << 8));
auto file1 = mock::MakeMockFile({blob1, blob2, blob3});
AddMockFile(file1);

const stl_wrappers::KVMap::value_type blob4(KeyStr("d", 4U, kTypeBlobIndex),
BlobStr(199, 3 << 10, 1 << 20));
const stl_wrappers::KVMap::value_type blob5(KeyStr("e", 5U, kTypeBlobIndex),
BlobStr(19, 6789, 333));
const stl_wrappers::KVMap::value_type blob6(
KeyStr("f", 6U, kTypeBlobIndex),
BlobStrTTL(5, 2048, 1 << 7, 1234567890ULL));
auto file2 = mock::MakeMockFile({blob4, blob5, blob6});
AddMockFile(file2);

const stl_wrappers::KVMap::value_type expected_blob1(
KeyStr("a", 0U, kTypeBlobIndex), blob1.second);
const stl_wrappers::KVMap::value_type expected_blob2(
KeyStr("b", 0U, kTypeBlobIndex), blob2.second);
const stl_wrappers::KVMap::value_type expected_blob3(
KeyStr("c", 0U, kTypeBlobIndex), blob3.second);
const stl_wrappers::KVMap::value_type expected_blob4(
KeyStr("d", 0U, kTypeBlobIndex), blob4.second);
const stl_wrappers::KVMap::value_type expected_blob5(
KeyStr("e", 0U, kTypeBlobIndex), blob5.second);
const stl_wrappers::KVMap::value_type expected_blob6(
KeyStr("f", 0U, kTypeBlobIndex), blob6.second);
auto expected_results =
mock::MakeMockFile({expected_blob1, expected_blob2, expected_blob3,
expected_blob4, expected_blob5, expected_blob6});

SetLastSequence(6U);
auto files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({files}, expected_results, std::vector<SequenceNumber>(),
kMaxSequenceNumber, /* output_level */ 1, /* verify */ true,
/* expected_oldest_blob_file_number */ 19);
}

} // namespace rocksdb

int main(int argc, char** argv) {
Expand Down
12 changes: 5 additions & 7 deletions db/compaction/compaction_picker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,13 @@ class CompactionPickerTest : public testing::Test {
SequenceNumber smallest_seq = 100, SequenceNumber largest_seq = 100,
size_t compensated_file_size = 0) {
assert(level < vstorage_->num_levels());
FileMetaData* f = new FileMetaData;
f->fd = FileDescriptor(file_number, path_id, file_size);
f->smallest = InternalKey(smallest, smallest_seq, kTypeValue);
f->largest = InternalKey(largest, largest_seq, kTypeValue);
f->fd.smallest_seqno = smallest_seq;
f->fd.largest_seqno = largest_seq;
FileMetaData* f = new FileMetaData(
file_number, path_id, file_size,
InternalKey(smallest, smallest_seq, kTypeValue),
InternalKey(largest, largest_seq, kTypeValue), smallest_seq,
largest_seq, /* marked_for_compact */ false, kInvalidBlobFileNumber);
f->compensated_file_size =
(compensated_file_size != 0) ? compensated_file_size : file_size;
f->refs = 0;
vstorage_->AddFile(level, f);
files_.emplace_back(f);
file_map_.insert({file_number, {f, level}});
Expand Down
5 changes: 3 additions & 2 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1257,7 +1257,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest,
f->fd.smallest_seqno, f->fd.largest_seqno,
f->marked_for_compaction);
f->marked_for_compaction, f->oldest_blob_file_number);
}
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[%s] Apply version edit:\n%s", cfd->GetName().c_str(),
Expand Down Expand Up @@ -2657,7 +2657,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
c->edit()->AddFile(c->output_level(), f->fd.GetNumber(),
f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest,
f->largest, f->fd.smallest_seqno,
f->fd.largest_seqno, f->marked_for_compaction);
f->fd.largest_seqno, f->marked_for_compaction,
f->oldest_blob_file_number);

ROCKS_LOG_BUFFER(
log_buffer,
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_experimental.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) {
edit.AddFile(target_level, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest,
f->fd.smallest_seqno, f->fd.largest_seqno,
f->marked_for_compaction);
f->marked_for_compaction, f->oldest_blob_file_number);
}

status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
meta.fd.GetFileSize(), meta.smallest, meta.largest,
meta.fd.smallest_seqno, meta.fd.largest_seqno,
meta.marked_for_compaction);
meta.marked_for_compaction, meta.oldest_blob_file_number);
}

InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
Expand Down
9 changes: 7 additions & 2 deletions db/event_helpers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ void EventHelpers::LogAndNotifyTableFileCreationFinished(
const std::vector<std::shared_ptr<EventListener>>& listeners,
const std::string& db_name, const std::string& cf_name,
const std::string& file_path, int job_id, const FileDescriptor& fd,
const TableProperties& table_properties, TableFileCreationReason reason,
const Status& s) {
uint64_t oldest_blob_file_number, const TableProperties& table_properties,
TableFileCreationReason reason, const Status& s) {
if (s.ok() && event_logger) {
JSONWriter jwriter;
AppendCurrentTime(&jwriter);
Expand Down Expand Up @@ -129,6 +129,11 @@ void EventHelpers::LogAndNotifyTableFileCreationFinished(
}
jwriter.EndObject();
}

if (oldest_blob_file_number != kInvalidBlobFileNumber) {
jwriter << "oldest_blob_file_number" << oldest_blob_file_number;
}

jwriter.EndObject();

event_logger->Log(jwriter);
Expand Down
4 changes: 2 additions & 2 deletions db/event_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ class EventHelpers {
const std::vector<std::shared_ptr<EventListener>>& listeners,
const std::string& db_name, const std::string& cf_name,
const std::string& file_path, int job_id, const FileDescriptor& fd,
const TableProperties& table_properties, TableFileCreationReason reason,
const Status& s);
uint64_t oldest_blob_file_number, const TableProperties& table_properties,
TableFileCreationReason reason, const Status& s);
static void LogAndNotifyTableFileDeletion(
EventLogger* event_logger, int job_id,
uint64_t file_number, const std::string& file_path,
Expand Down
3 changes: 2 additions & 1 deletion db/external_sst_file_ingestion_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,11 @@ Status ExternalSstFileIngestionJob::Run() {
if (!status.ok()) {
return status;
}

edit_.AddFile(f.picked_level, f.fd.GetNumber(), f.fd.GetPathId(),
f.fd.GetFileSize(), f.smallest_internal_key,
f.largest_internal_key, f.assigned_seqno, f.assigned_seqno,
false);
false, kInvalidBlobFileNumber);
}
return status;
}
Expand Down
2 changes: 1 addition & 1 deletion db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ Status FlushJob::WriteLevel0Table() {
edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(),
meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
meta_.fd.smallest_seqno, meta_.fd.largest_seqno,
meta_.marked_for_compaction);
meta_.marked_for_compaction, meta_.oldest_blob_file_number);
}

// Note that here we treat flush as level 0 compaction in internal stats
Expand Down
Loading

0 comments on commit 5f025ea

Please sign in to comment.