Skip to content

Commit

Permalink
Lower the risk for users to run options.force_consistency_checks = tr…
Browse files Browse the repository at this point in the history
…ue (facebook#5744)

Summary:
Open-source users recently reported two occurrences of LSM-tree corruption (facebook#5558 is one), which would be caught by options.force_consistency_checks = true. options.force_consistency_checks has a usability limitation because it crashes the service once inconsistency is detected. This makes the feature hard to use. Most users serve from multiple RocksDB shards per server and the impacts of crashing the service is higher than it should be.

Instead, we just pass the error back to users without killing the service, and ask them to deal with the problem accordingly.
Pull Request resolved: facebook#5744

Differential Revision: D17096940

Pulled By: pdhandharia

fbshipit-source-id: b6780039044e265f26ed2ad03c51f4abbe8b603c
  • Loading branch information
pdhandharia authored and facebook-github-bot committed Aug 29, 2019
1 parent 1729779 commit a281822
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 41 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* Fix bloom filter lookups by the MultiGet batching API when BlockBasedTableOptions::whole_key_filtering is false, by checking that a key is in the perfix_extractor domain and extracting the prefix before looking up.
### New Features
* VerifyChecksum() by default will issue readahead. Allow ReadOptions to be passed in to those functions to override the readhead size. For checksum verifying before external SST file ingestion, a new option IngestExternalFileOptions.verify_checksums_readahead_size, is added for this readahead setting.
* When user uses options.force_consistency_check in RocksDb, instead of crashing the process, we now pass the error back to the users without killing the process.
### Public API Change
* Added max_write_buffer_size_to_maintain option to better control memory usage of immutable memtables.

Expand Down
24 changes: 24 additions & 0 deletions db/db_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4658,7 +4658,31 @@ TEST_P(DBCompactionTestWithParam, FixFileIngestionCompactionDeadlock) {
ASSERT_OK(dbfull()->TEST_WaitForCompact());
Close();
}
TEST_F(DBCompactionTest, ConsistencyFailTest) {
Options options = CurrentOptions();
DestroyAndReopen(options);

rocksdb::SyncPoint::GetInstance()->SetCallBack(
"VersionBuilder::CheckConsistency", [&](void* arg) {
auto p =
reinterpret_cast<std::pair<FileMetaData**, FileMetaData**>*>(arg);
// just swap the two FileMetaData so that we hit error
// in CheckConsistency funcion
FileMetaData* temp = *(p->first);
*(p->first) = *(p->second);
*(p->second) = temp;
});

rocksdb::SyncPoint::GetInstance()->EnableProcessing();

for (int k = 0; k < 2; ++k) {
ASSERT_OK(Put("foo", "bar"));
Flush();
}

ASSERT_NOK(Put("foo", "bar"));
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
#endif // !defined(ROCKSDB_LITE)
} // namespace rocksdb

Expand Down
86 changes: 61 additions & 25 deletions db/version_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "db/version_set.h"
#include "port/port.h"
#include "table/table_reader.h"
#include "util/string_util.h"

namespace rocksdb {

Expand Down Expand Up @@ -138,12 +139,12 @@ class VersionBuilder::Rep {
}
}

void CheckConsistency(VersionStorageInfo* vstorage) {
Status CheckConsistency(VersionStorageInfo* vstorage) {
#ifdef NDEBUG
if (!vstorage->force_consistency_checks()) {
// Dont run consistency checks in release mode except if
// explicitly asked to
return;
return Status::OK();
}
#endif
// make sure the files are sorted correctly
Expand All @@ -152,10 +153,14 @@ class VersionBuilder::Rep {
for (size_t i = 1; i < level_files.size(); i++) {
auto f1 = level_files[i - 1];
auto f2 = level_files[i];
#ifndef NDEBUG
auto pair = std::make_pair(&f1, &f2);
TEST_SYNC_POINT_CALLBACK("VersionBuilder::CheckConsistency", &pair);
#endif
if (level == 0) {
if (!level_zero_cmp_(f1, f2)) {
fprintf(stderr, "L0 files are not sorted properly");
abort();
return Status::Corruption("L0 files are not sorted properly");
}

if (f2->fd.smallest_seqno == f2->fd.largest_seqno) {
Expand All @@ -168,20 +173,34 @@ class VersionBuilder::Rep {
" vs. file with global_seqno %" PRIu64 "\n",
f1->fd.smallest_seqno, f1->fd.largest_seqno,
external_file_seqno);
abort();
return Status::Corruption("L0 file with seqno " +
NumberToString(f1->fd.smallest_seqno) +
" " +
NumberToString(f1->fd.largest_seqno) +
" vs. file with global_seqno" +
NumberToString(external_file_seqno) +
" with fileNumber " +
NumberToString(f1->fd.GetNumber()));
}
} else if (f1->fd.smallest_seqno <= f2->fd.smallest_seqno) {
fprintf(stderr,
"L0 files seqno %" PRIu64 " %" PRIu64 " vs. %" PRIu64
" %" PRIu64 "\n",
f1->fd.smallest_seqno, f1->fd.largest_seqno,
f2->fd.smallest_seqno, f2->fd.largest_seqno);
abort();
return Status::Corruption(
"L0 files seqno " + NumberToString(f1->fd.smallest_seqno) +
" " + NumberToString(f1->fd.largest_seqno) + " " +
NumberToString(f1->fd.GetNumber()) + " vs. " +
NumberToString(f2->fd.smallest_seqno) + " " +
NumberToString(f2->fd.largest_seqno) + " " +
NumberToString(f2->fd.GetNumber()));
}
} else {
if (!level_nonzero_cmp_(f1, f2)) {
fprintf(stderr, "L%d files are not sorted properly", level);
abort();
return Status::Corruption("L" + NumberToString(level) +
" files are not sorted properly");
}

// Make sure there is no overlap in levels > 0
Expand All @@ -190,20 +209,24 @@ class VersionBuilder::Rep {
fprintf(stderr, "L%d have overlapping ranges %s vs. %s\n", level,
(f1->largest).DebugString(true).c_str(),
(f2->smallest).DebugString(true).c_str());
abort();
return Status::Corruption(
"L" + NumberToString(level) + " have overlapping ranges " +
(f1->largest).DebugString(true) + " vs. " +
(f2->smallest).DebugString(true));
}
}
}
}
return Status::OK();
}

void CheckConsistencyForDeletes(VersionEdit* /*edit*/, uint64_t number,
int level) {
Status CheckConsistencyForDeletes(VersionEdit* /*edit*/, uint64_t number,
int level) {
#ifdef NDEBUG
if (!base_vstorage_->force_consistency_checks()) {
// Dont run consistency checks in release mode except if
// explicitly asked to
return;
return Status::OK();
}
#endif
// a file to be deleted better exist in the previous version
Expand Down Expand Up @@ -241,8 +264,9 @@ class VersionBuilder::Rep {
}
if (!found) {
fprintf(stderr, "not found %" PRIu64 "\n", number);
abort();
return Status::Corruption("not found " + NumberToString(number));
}
return Status::OK();
}

bool CheckConsistencyForNumLevels() {
Expand All @@ -259,8 +283,11 @@ class VersionBuilder::Rep {
}

// Apply all of the edits in *edit to the current state.
void Apply(VersionEdit* edit) {
CheckConsistency(base_vstorage_);
Status Apply(VersionEdit* edit) {
Status s = CheckConsistency(base_vstorage_);
if (!s.ok()) {
return s;
}

// Delete files
const VersionEdit::DeletedFileSet& del = edit->GetDeletedFiles();
Expand Down Expand Up @@ -308,12 +335,20 @@ class VersionBuilder::Rep {
}
}
}
return s;
}

// Save the current state in *v.
void SaveTo(VersionStorageInfo* vstorage) {
CheckConsistency(base_vstorage_);
CheckConsistency(vstorage);
Status SaveTo(VersionStorageInfo* vstorage) {
Status s = CheckConsistency(base_vstorage_);
if (!s.ok()) {
return s;
}

s = CheckConsistency(vstorage);
if (!s.ok()) {
return s;
}

for (int level = 0; level < num_levels_; level++) {
const auto& cmp = (level == 0) ? level_zero_cmp_ : level_nonzero_cmp_;
Expand Down Expand Up @@ -357,7 +392,8 @@ class VersionBuilder::Rep {
}
}

CheckConsistency(vstorage);
s = CheckConsistency(vstorage);
return s;
}

Status LoadTableHandlers(InternalStats* internal_stats, int max_threads,
Expand Down Expand Up @@ -475,23 +511,23 @@ VersionBuilder::VersionBuilder(const EnvOptions& env_options,

VersionBuilder::~VersionBuilder() { delete rep_; }

void VersionBuilder::CheckConsistency(VersionStorageInfo* vstorage) {
rep_->CheckConsistency(vstorage);
Status VersionBuilder::CheckConsistency(VersionStorageInfo* vstorage) {
return rep_->CheckConsistency(vstorage);
}

void VersionBuilder::CheckConsistencyForDeletes(VersionEdit* edit,
uint64_t number, int level) {
rep_->CheckConsistencyForDeletes(edit, number, level);
Status VersionBuilder::CheckConsistencyForDeletes(VersionEdit* edit,
uint64_t number, int level) {
return rep_->CheckConsistencyForDeletes(edit, number, level);
}

bool VersionBuilder::CheckConsistencyForNumLevels() {
return rep_->CheckConsistencyForNumLevels();
}

void VersionBuilder::Apply(VersionEdit* edit) { rep_->Apply(edit); }
Status VersionBuilder::Apply(VersionEdit* edit) { return rep_->Apply(edit); }

void VersionBuilder::SaveTo(VersionStorageInfo* vstorage) {
rep_->SaveTo(vstorage);
Status VersionBuilder::SaveTo(VersionStorageInfo* vstorage) {
return rep_->SaveTo(vstorage);
}

Status VersionBuilder::LoadTableHandlers(
Expand Down
10 changes: 5 additions & 5 deletions db/version_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ class VersionBuilder {
VersionBuilder(const EnvOptions& env_options, TableCache* table_cache,
VersionStorageInfo* base_vstorage, Logger* info_log = nullptr);
~VersionBuilder();
void CheckConsistency(VersionStorageInfo* vstorage);
void CheckConsistencyForDeletes(VersionEdit* edit, uint64_t number,
int level);
Status CheckConsistency(VersionStorageInfo* vstorage);
Status CheckConsistencyForDeletes(VersionEdit* edit, uint64_t number,
int level);
bool CheckConsistencyForNumLevels();
void Apply(VersionEdit* edit);
void SaveTo(VersionStorageInfo* vstorage);
Status Apply(VersionEdit* edit);
Status SaveTo(VersionStorageInfo* vstorage);
Status LoadTableHandlers(InternalStats* internal_stats, int max_threads,
bool prefetch_index_and_filter_in_cache,
bool is_initial_load,
Expand Down
43 changes: 34 additions & 9 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3622,15 +3622,29 @@ Status VersionSet::ProcessManifestWrites(
} else if (group_start != std::numeric_limits<size_t>::max()) {
group_start = std::numeric_limits<size_t>::max();
}
LogAndApplyHelper(last_writer->cfd, builder, e, mu);
Status s = LogAndApplyHelper(last_writer->cfd, builder, e, mu);
if (!s.ok()) {
// free up the allocated memory
for (auto v : versions) {
delete v;
}
return s;
}
batch_edits.push_back(e);
}
}
for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
assert(!builder_guards.empty() &&
builder_guards.size() == versions.size());
auto* builder = builder_guards[i]->version_builder();
builder->SaveTo(versions[i]->storage_info());
Status s = builder->SaveTo(versions[i]->storage_info());
if (!s.ok()) {
// free up the allocated memory
for (auto v : versions) {
delete v;
}
return s;
}
}
}

Expand Down Expand Up @@ -4010,9 +4024,9 @@ void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) {
}
}

void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
VersionBuilder* builder, VersionEdit* edit,
InstrumentedMutex* mu) {
Status VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
VersionBuilder* builder, VersionEdit* edit,
InstrumentedMutex* mu) {
#ifdef NDEBUG
(void)cfd;
#endif
Expand All @@ -4036,7 +4050,9 @@ void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
: last_sequence_);

builder->Apply(edit);
Status s = builder->Apply(edit);

return s;
}

Status VersionSet::ApplyOneVersionEditToBuilder(
Expand Down Expand Up @@ -4129,7 +4145,10 @@ Status VersionSet::ApplyOneVersionEditToBuilder(
// to builder
auto builder = builders.find(edit.column_family_);
assert(builder != builders.end());
builder->second->version_builder()->Apply(&edit);
Status s = builder->second->version_builder()->Apply(&edit);
if (!s.ok()) {
return s;
}
}
return ExtractInfoFromVersionEdit(
cfd, edit, have_log_number, log_number, have_prev_log_number,
Expand Down Expand Up @@ -4748,7 +4767,10 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
// to builder
auto builder = builders.find(edit.column_family_);
assert(builder != builders.end());
builder->second->version_builder()->Apply(&edit);
s = builder->second->version_builder()->Apply(&edit);
if (!s.ok()) {
break;
}
}

if (cfd != nullptr && edit.has_log_number_) {
Expand Down Expand Up @@ -5767,7 +5789,10 @@ Status ReactiveVersionSet::ApplyOneVersionEditToBuilder(
}
active_version_builders_.erase(builder_iter);
} else {
builder->Apply(&edit);
Status s = builder->Apply(&edit);
if (!s.ok()) {
return s;
}
}
Status s = ExtractInfoFromVersionEdit(
cfd, edit, have_log_number, log_number, have_prev_log_number,
Expand Down
4 changes: 2 additions & 2 deletions db/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -1154,8 +1154,8 @@ class VersionSet {
const ColumnFamilyOptions* new_cf_options);

void LogAndApplyCFHelper(VersionEdit* edit);
void LogAndApplyHelper(ColumnFamilyData* cfd, VersionBuilder* b,
VersionEdit* edit, InstrumentedMutex* mu);
Status LogAndApplyHelper(ColumnFamilyData* cfd, VersionBuilder* b,
VersionEdit* edit, InstrumentedMutex* mu);
};

// ReactiveVersionSet represents a collection of versions of the column
Expand Down

0 comments on commit a281822

Please sign in to comment.