Skip to content

Commit

Permalink
Compare the number of input keys and processed keys for compactions (f…
Browse files Browse the repository at this point in the history
…acebook#11571)

Summary:
... to improve data integrity validation during compaction.

A new option `compaction_verify_record_count` is introduced for this verification and is enabled by default. One exception when the verification is not done is when a compaction filter returns kRemoveAndSkipUntil which can cause CompactionIterator to seek until some key and hence not able to keep track of the number of keys processed.

For expected number of input keys, we sum over the number of total keys - number of range tombstones across compaction input files (`CompactionJob::UpdateCompactionStats()`). Table properties are consulted if `FileMetaData` is not initialized for some input file. Since table properties for all input files were also constructed during `DBImpl::NotifyOnCompactionBegin()`, `Compaction::GetTableProperties()` is introduced to reduce duplicated code.

For actual number of keys processed, each subcompaction will record its number of keys processed to `sub_compact->compaction_job_stats.num_input_records` and aggregated when all subcompactions finish (`CompactionJob::AggregateCompactionStats()`). In the case when some subcompaction encountered kRemoveAndSkipUntil from compaction filter and does not have accurate count, it propagates this information through `sub_compact->compaction_job_stats.has_num_input_records`.

Pull Request resolved: facebook#11571

Test Plan:
* Add a new unit test `DBCompactionTest.VerifyRecordCount` for the corruption case.
* All other unit tests for non-corrupted case.
* Ran crash test for a few hours: `python3 ./tools/db_crashtest.py whitebox --simple`

Reviewed By: ajkr

Differential Revision: D47131965

Pulled By: cbi42

fbshipit-source-id: cc8e94565dd526c4347e9d3843ecf32f6727af92
  • Loading branch information
cbi42 authored and facebook-github-bot committed Jul 28, 2023
1 parent 5dd8c11 commit 6a0f637
Show file tree
Hide file tree
Showing 23 changed files with 270 additions and 78 deletions.
4 changes: 3 additions & 1 deletion db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ Status BuildTable(
blob_file_builder.get(), ioptions.allow_data_in_errors,
ioptions.enforce_single_del_contracts,
/*manual_compaction_canceled=*/kManualCompactionCanceledFalse,
true /* must_count_input_entries */,
/*compaction=*/nullptr, compaction_filter.get(),
/*shutting_down=*/nullptr, db_options.info_log, full_history_ts_low);

Expand Down Expand Up @@ -286,8 +287,9 @@ Status BuildTable(
TEST_SYNC_POINT("BuildTable:BeforeFinishBuildTable");
const bool empty = builder->IsEmpty();
if (num_input_entries != nullptr) {
assert(c_iter.HasNumInputEntryScanned());
*num_input_entries =
c_iter.num_input_entry_scanned() + num_unfragmented_tombstones;
c_iter.NumInputEntryScanned() + num_unfragmented_tombstones;
}
if (!s.ok() || empty) {
builder->Abandon();
Expand Down
29 changes: 29 additions & 0 deletions db/compaction/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <vector>

#include "db/column_family.h"
#include "logging/logging.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/sst_partitioner.h"
#include "test_util/sync_point.h"
Expand Down Expand Up @@ -203,6 +204,34 @@ bool Compaction::IsFullCompaction(
return num_files_in_compaction == total_num_files;
}

const TablePropertiesCollection& Compaction::GetTableProperties() {
if (!input_table_properties_initialized_) {
const ReadOptions read_options(Env::IOActivity::kCompaction);
for (size_t i = 0; i < num_input_levels(); ++i) {
for (const FileMetaData* fmd : *(this->inputs(i))) {
std::shared_ptr<const TableProperties> tp;
std::string file_name =
TableFileName(immutable_options_.cf_paths, fmd->fd.GetNumber(),
fmd->fd.GetPathId());
Status s = input_version_->GetTableProperties(read_options, &tp, fmd,
&file_name);
if (s.ok()) {
table_properties_[file_name] = tp;
} else {
ROCKS_LOG_ERROR(immutable_options_.info_log,
"Unable to load table properties for file %" PRIu64
" --- %s\n",
fmd->fd.GetNumber(), s.ToString().c_str());
}
}
}

input_table_properties_initialized_ = true;
};

return table_properties_;
}

Compaction::Compaction(
VersionStorageInfo* vstorage, const ImmutableOptions& _immutable_options,
const MutableCFOptions& _mutable_cf_options,
Expand Down
19 changes: 12 additions & 7 deletions db/compaction/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -326,12 +326,16 @@ class Compaction {
int output_level, VersionStorageInfo* vstorage,
const std::vector<CompactionInputFiles>& inputs);

TablePropertiesCollection GetOutputTableProperties() const {
return output_table_properties_;
}

void SetOutputTableProperties(TablePropertiesCollection tp) {
output_table_properties_ = std::move(tp);
// If called before a compaction finishes, will return
// table properties of all compaction input files.
// If called after a compaction finished, will return
// table properties of all compaction input and output files.
const TablePropertiesCollection& GetTableProperties();

void SetOutputTableProperties(
const std::string& file_name,
const std::shared_ptr<const TableProperties>& tp) {
table_properties_[file_name] = tp;
}

Slice GetSmallestUserKey() const { return smallest_user_key_; }
Expand Down Expand Up @@ -518,8 +522,9 @@ class Compaction {
// Does input compression match the output compression?
bool InputCompressionMatchesOutput() const;

bool input_table_properties_initialized_ = false;
// table properties of output files
TablePropertiesCollection output_table_properties_;
TablePropertiesCollection table_properties_;

// smallest user keys in compaction
// includes timestamp if user-defined timestamp is enabled.
Expand Down
13 changes: 7 additions & 6 deletions db/compaction/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ CompactionIterator::CompactionIterator(
BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
bool enforce_single_del_contracts,
const std::atomic<bool>& manual_compaction_canceled,
const Compaction* compaction, const CompactionFilter* compaction_filter,
bool must_count_input_entries, const Compaction* compaction,
const CompactionFilter* compaction_filter,
const std::atomic<bool>* shutting_down,
const std::shared_ptr<Logger> info_log,
const std::string* full_history_ts_low,
Expand All @@ -45,8 +46,9 @@ CompactionIterator::CompactionIterator(
manual_compaction_canceled,
std::unique_ptr<CompactionProxy>(
compaction ? new RealCompaction(compaction) : nullptr),
compaction_filter, shutting_down, info_log, full_history_ts_low,
preserve_time_min_seqno, preclude_last_level_min_seqno) {}
must_count_input_entries, compaction_filter, shutting_down, info_log,
full_history_ts_low, preserve_time_min_seqno,
preclude_last_level_min_seqno) {}

CompactionIterator::CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
Expand All @@ -58,15 +60,14 @@ CompactionIterator::CompactionIterator(
BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
bool enforce_single_del_contracts,
const std::atomic<bool>& manual_compaction_canceled,
std::unique_ptr<CompactionProxy> compaction,
std::unique_ptr<CompactionProxy> compaction, bool must_count_input_entries,
const CompactionFilter* compaction_filter,
const std::atomic<bool>* shutting_down,
const std::shared_ptr<Logger> info_log,
const std::string* full_history_ts_low,
const SequenceNumber preserve_time_min_seqno,
const SequenceNumber preclude_last_level_min_seqno)
: input_(input, cmp,
!compaction || compaction->DoesInputReferenceBlobFiles()),
: input_(input, cmp, must_count_input_entries),
cmp_(cmp),
merge_helper_(merge_helper),
snapshots_(snapshots),
Expand Down
23 changes: 17 additions & 6 deletions db/compaction/compaction_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,18 @@ class SequenceIterWrapper : public InternalIterator {
bool Valid() const override { return inner_iter_->Valid(); }
Status status() const override { return inner_iter_->status(); }
void Next() override {
num_itered_++;
if (!inner_iter_->IsDeleteRangeSentinelKey()) {
num_itered_++;
}
inner_iter_->Next();
}
void Seek(const Slice& target) override {
if (!need_count_entries_) {
has_num_itered_ = false;
inner_iter_->Seek(target);
} else {
// For flush cases, we need to count total number of entries, so we
// do Next() rather than Seek().
// Need to count total number of entries,
// so we do Next() rather than Seek().
while (inner_iter_->Valid() &&
icmp_.Compare(inner_iter_->key(), target) < 0) {
Next();
Expand All @@ -62,7 +65,8 @@ class SequenceIterWrapper : public InternalIterator {
void SeekForPrev(const Slice& /* target */) override { assert(false); }
void SeekToLast() override { assert(false); }

uint64_t num_itered() const { return num_itered_; }
uint64_t NumItered() const { return num_itered_; }
bool HasNumItered() const { return has_num_itered_; }
bool IsDeleteRangeSentinelKey() const override {
assert(Valid());
return inner_iter_->IsDeleteRangeSentinelKey();
Expand All @@ -73,6 +77,7 @@ class SequenceIterWrapper : public InternalIterator {
InternalIterator* inner_iter_; // not owned
uint64_t num_itered_ = 0;
bool need_count_entries_;
bool has_num_itered_ = true;
};

class CompactionIterator {
Expand Down Expand Up @@ -189,6 +194,10 @@ class CompactionIterator {
const Compaction* compaction_;
};

// @param must_count_input_entries if true, `NumInputEntryScanned()` will
// return the number of input keys scanned. If false, `NumInputEntryScanned()`
// will return this number if no Seek was called on `input`. User should call
// `HasNumInputEntryScanned()` first in this case.
CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
Expand All @@ -199,7 +208,7 @@ class CompactionIterator {
BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
bool enforce_single_del_contracts,
const std::atomic<bool>& manual_compaction_canceled,
const Compaction* compaction = nullptr,
bool must_count_input_entries, const Compaction* compaction = nullptr,
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
const std::shared_ptr<Logger> info_log = nullptr,
Expand All @@ -219,6 +228,7 @@ class CompactionIterator {
bool enforce_single_del_contracts,
const std::atomic<bool>& manual_compaction_canceled,
std::unique_ptr<CompactionProxy> compaction,
bool must_count_input_entries,
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
const std::shared_ptr<Logger> info_log = nullptr,
Expand Down Expand Up @@ -253,7 +263,8 @@ class CompactionIterator {
return current_user_key_;
}
const CompactionIterationStats& iter_stats() const { return iter_stats_; }
uint64_t num_input_entry_scanned() const { return input_.num_itered(); }
bool HasNumInputEntryScanned() const { return input_.HasNumItered(); }
uint64_t NumInputEntryScanned() const { return input_.NumItered(); }
// If the current key should be placed on penultimate level, only valid if
// per_key_placement is supported
bool output_to_penultimate_level() const {
Expand Down
4 changes: 2 additions & 2 deletions db/compaction/compaction_iterator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,8 @@ class CompactionIteratorTest : public testing::TestWithParam<bool> {
nullptr /* blob_file_builder */, true /*allow_data_in_errors*/,
true /*enforce_single_del_contracts*/,
/*manual_compaction_canceled=*/kManualCompactionCanceledFalse_,
std::move(compaction), filter, &shutting_down_, /*info_log=*/nullptr,
full_history_ts_low));
std::move(compaction), /*must_count_input_entries=*/false, filter,
&shutting_down_, /*info_log=*/nullptr, full_history_ts_low));
}

void AddSnapshot(SequenceNumber snapshot,
Expand Down
121 changes: 93 additions & 28 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -796,15 +796,46 @@ Status CompactionJob::Run() {
auto fn =
TableFileName(state.compaction->immutable_options()->cf_paths,
output.meta.fd.GetNumber(), output.meta.fd.GetPathId());
tp[fn] = output.table_properties;
compact_->compaction->SetOutputTableProperties(fn,
output.table_properties);
}
}
compact_->compaction->SetOutputTableProperties(std::move(tp));

// Finish up all book-keeping to unify the subcompaction results
// Finish up all bookkeeping to unify the subcompaction results.
compact_->AggregateCompactionStats(compaction_stats_, *compaction_job_stats_);
UpdateCompactionStats();

uint64_t num_input_range_del = 0;
bool ok = UpdateCompactionStats(&num_input_range_del);
// (Sub)compactions returned ok, do sanity check on the number of input keys.
if (status.ok() && ok && compaction_job_stats_->has_num_input_records) {
size_t ts_sz = compact_->compaction->column_family_data()
->user_comparator()
->timestamp_size();
// When trim_ts_ is non-empty, CompactionIterator takes
// HistoryTrimmingIterator as input iterator and sees a trimmed view of
// input keys. So the number of keys it processed is not suitable for
// verification here.
// TODO: support verification when trim_ts_ is non-empty.
if (!(ts_sz > 0 && !trim_ts_.empty()) &&
db_options_.compaction_verify_record_count) {
assert(compaction_stats_.stats.num_input_records > 0);
// TODO: verify the number of range deletion entries.
uint64_t expected =
compaction_stats_.stats.num_input_records - num_input_range_del;
uint64_t actual = compaction_job_stats_->num_input_records;
if (expected != actual) {
std::string msg =
"Total number of input records: " + std::to_string(expected) +
", but processed " + std::to_string(actual) + " records.";
ROCKS_LOG_WARN(
db_options_.info_log, "[%s] [JOB %d] Compaction %s",
compact_->compaction->column_family_data()->GetName().c_str(),
job_context_->job_id, msg.c_str());
status = Status::Corruption(
"Compaction number of input keys does not match number of keys "
"processed.");
}
}
}
RecordCompactionIOStats();
LogFlush(db_options_.info_log);
TEST_SYNC_POINT("CompactionJob::Run():End");
Expand Down Expand Up @@ -1252,6 +1283,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
/*expect_valid_internal_key=*/true, range_del_agg.get(),
blob_file_builder.get(), db_options_.allow_data_in_errors,
db_options_.enforce_single_del_contracts, manual_compaction_canceled_,
sub_compact->compaction
->DoesInputReferenceBlobFiles() /* must_count_input_entries */,
sub_compact->compaction, compaction_filter, shutting_down_,
db_options_.info_log, full_history_ts_low, preserve_time_min_seqno_,
preclude_last_level_min_seqno_);
Expand Down Expand Up @@ -1316,8 +1349,25 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
if (c_iter->status().IsManualCompactionPaused()) {
break;
}

#ifndef NDEBUG
bool stop = false;
TEST_SYNC_POINT_CALLBACK("CompactionJob::ProcessKeyValueCompaction()::stop",
static_cast<void*>(&stop));
if (stop) {
break;
}
#endif // NDEBUG
}

// This number may not be accurate when CompactionIterator was created
// with `must_count_input_entries=false`.
assert(!sub_compact->compaction->DoesInputReferenceBlobFiles() ||
c_iter->HasNumInputEntryScanned());
sub_compact->compaction_job_stats.has_num_input_records =
c_iter->HasNumInputEntryScanned();
sub_compact->compaction_job_stats.num_input_records =
c_iter->NumInputEntryScanned();
sub_compact->compaction_job_stats.num_blobs_read =
c_iter_stats.num_blobs_read;
sub_compact->compaction_job_stats.total_blob_bytes_read =
Expand Down Expand Up @@ -1903,24 +1953,53 @@ void CopyPrefix(const Slice& src, size_t prefix_length, std::string* dst) {
}
} // namespace


void CompactionJob::UpdateCompactionStats() {
bool CompactionJob::UpdateCompactionStats(uint64_t* num_input_range_del) {
assert(compact_);

Compaction* compaction = compact_->compaction;
compaction_stats_.stats.num_input_files_in_non_output_levels = 0;
compaction_stats_.stats.num_input_files_in_output_level = 0;

bool has_error = false;
const ReadOptions read_options(Env::IOActivity::kCompaction);
const auto& input_table_properties = compaction->GetTableProperties();
for (int input_level = 0;
input_level < static_cast<int>(compaction->num_input_levels());
++input_level) {
size_t num_input_files = compaction->num_input_files(input_level);
uint64_t* bytes_read;
if (compaction->level(input_level) != compaction->output_level()) {
UpdateCompactionInputStatsHelper(
&compaction_stats_.stats.num_input_files_in_non_output_levels,
&compaction_stats_.stats.bytes_read_non_output_levels, input_level);
compaction_stats_.stats.num_input_files_in_non_output_levels +=
static_cast<int>(num_input_files);
bytes_read = &compaction_stats_.stats.bytes_read_non_output_levels;
} else {
UpdateCompactionInputStatsHelper(
&compaction_stats_.stats.num_input_files_in_output_level,
&compaction_stats_.stats.bytes_read_output_level, input_level);
compaction_stats_.stats.num_input_files_in_output_level +=
static_cast<int>(num_input_files);
bytes_read = &compaction_stats_.stats.bytes_read_output_level;
}
for (size_t i = 0; i < num_input_files; ++i) {
const FileMetaData* file_meta = compaction->input(input_level, i);
*bytes_read += file_meta->fd.GetFileSize();
uint64_t file_input_entries = file_meta->num_entries;
uint64_t file_num_range_del = file_meta->num_range_deletions;
if (file_input_entries == 0) {
uint64_t file_number = file_meta->fd.GetNumber();
// Try getting info from table property
std::string fn =
TableFileName(compaction->immutable_options()->cf_paths,
file_number, file_meta->fd.GetPathId());
const auto& tp = input_table_properties.find(fn);
if (tp != input_table_properties.end()) {
file_input_entries = tp->second->num_entries;
file_num_range_del = tp->second->num_range_deletions;
} else {
has_error = true;
}
}
compaction_stats_.stats.num_input_records += file_input_entries;
if (num_input_range_del) {
*num_input_range_del += file_num_range_del;
}
}
}

Expand All @@ -1930,21 +2009,7 @@ void CompactionJob::UpdateCompactionStats() {

compaction_stats_.stats.num_dropped_records =
compaction_stats_.DroppedRecords();
}

void CompactionJob::UpdateCompactionInputStatsHelper(int* num_files,
uint64_t* bytes_read,
int input_level) {
const Compaction* compaction = compact_->compaction;
auto num_input_files = compaction->num_input_files(input_level);
*num_files += static_cast<int>(num_input_files);

for (size_t i = 0; i < num_input_files; ++i) {
const auto* file_meta = compaction->input(input_level, i);
*bytes_read += file_meta->fd.GetFileSize();
compaction_stats_.stats.num_input_records +=
static_cast<uint64_t>(file_meta->num_entries);
}
return !has_error;
}

void CompactionJob::UpdateCompactionJobStats(
Expand Down
Loading

0 comments on commit 6a0f637

Please sign in to comment.