Skip to content

Commit

Permalink
Added compaction read errors to db_stress (facebook#11789)
Browse files Browse the repository at this point in the history
Summary:
- Fixed misspellings of "inject"
- Made user read errors retryable when `FLAGS_inject_error_severity == 1`
- Added compaction read errors when `FLAGS_read_fault_one_in > 0`. These are always retryable so that the DB will keep accepting writes
- Reenabled setting `compaction_readahead_size` in crash test. The reason for disabling it was to "keep the test clean", which is not a good enough reason to skip testing it

Pull Request resolved: facebook#11789

Test Plan:
With facebook#11782 reverted, reproduced the bug:
- Build: `make -j56 db_stress`
- Command: `TEST_TMPDIR=/dev/shm python3 tools/db_crashtest.py blackbox --simple --write_buffer_size=524288 --target_file_size_base=524288 --max_bytes_for_level_base=2097152 --interval=10 --max_key=1000000`
- Output:
```
stderr has error message:
***put or merge error: Corruption: Compaction number of input keys does not match number of keys processed.***
```

Reviewed By: cbi42

Differential Revision: D48939994

Pulled By: ajkr

fbshipit-source-id: a1efb799efecdfd5d9cfd185e4a6321db8fccfbb
  • Loading branch information
ajkr authored and facebook-github-bot committed Sep 5, 2023
1 parent d01b121 commit 392d695
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 36 deletions.
4 changes: 2 additions & 2 deletions db_stress_tool/db_stress_gflags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1027,8 +1027,8 @@ DEFINE_int32(open_write_fault_one_in, 0,
DEFINE_int32(open_read_fault_one_in, 0,
"On non-zero, enables fault injection on file reads "
"during DB reopen.");
DEFINE_int32(injest_error_severity, 1,
"The severity of the injested IO Error. 1 is soft error (e.g. "
DEFINE_int32(inject_error_severity, 1,
"The severity of the injected IO Error. 1 is soft error (e.g. "
"retryable error), 2 is fatal error, and the default is "
"retryable error.");
DEFINE_int32(prepopulate_block_cache,
Expand Down
15 changes: 15 additions & 0 deletions db_stress_tool/db_stress_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <mutex>
#include <unordered_set>

#include "db_stress_tool/db_stress_shared_state.h"
#include "file/filename.h"
#include "file/writable_file_writer.h"
#include "rocksdb/db.h"
Expand All @@ -19,9 +20,12 @@
#include "rocksdb/unique_id.h"
#include "util/gflags_compat.h"
#include "util/random.h"
#include "utilities/fault_injection_fs.h"

DECLARE_int32(compact_files_one_in);

extern std::shared_ptr<ROCKSDB_NAMESPACE::FaultInjectionTestFS> fault_fs_guard;

namespace ROCKSDB_NAMESPACE {

// Verify across process executions that all seen IDs are unique
Expand Down Expand Up @@ -95,6 +99,17 @@ class DbStressListener : public EventListener {
RandomSleep();
}

void OnSubcompactionBegin(const SubcompactionJobInfo& /* si */) override {
if (FLAGS_read_fault_one_in) {
// Hardcoded to inject retryable error as a non-retryable error would put
// the DB in read-only mode and then it would crash on the next write.
fault_fs_guard->SetThreadLocalReadErrorContext(
static_cast<uint32_t>(FLAGS_seed), FLAGS_read_fault_one_in,
true /* retryable */);
fault_fs_guard->EnableErrorInjection();
}
}

void OnTableFileCreationStarted(
const TableFileCreationBriefInfo& /*info*/) override {
++num_pending_file_creations_;
Expand Down
2 changes: 1 addition & 1 deletion db_stress_tool/db_stress_shared_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ DECLARE_int32(open_metadata_write_fault_one_in);
DECLARE_int32(open_write_fault_one_in);
DECLARE_int32(open_read_fault_one_in);

DECLARE_int32(injest_error_severity);
DECLARE_int32(inject_error_severity);

namespace ROCKSDB_NAMESPACE {
class StressTest;
Expand Down
43 changes: 22 additions & 21 deletions db_stress_tool/db_stress_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -773,17 +773,18 @@ void StressTest::OperateDb(ThreadState* thread) {

#ifndef NDEBUG
if (FLAGS_read_fault_one_in) {
fault_fs_guard->SetThreadLocalReadErrorContext(thread->shared->GetSeed(),
FLAGS_read_fault_one_in);
fault_fs_guard->SetThreadLocalReadErrorContext(
thread->shared->GetSeed(), FLAGS_read_fault_one_in,
FLAGS_inject_error_severity == 1 /* retryable */);
}
#endif // NDEBUG
if (FLAGS_write_fault_one_in) {
IOStatus error_msg;
if (FLAGS_injest_error_severity <= 1 || FLAGS_injest_error_severity > 2) {
if (FLAGS_inject_error_severity <= 1 || FLAGS_inject_error_severity > 2) {
error_msg = IOStatus::IOError("Retryable IO Error");
error_msg.SetRetryable(true);
} else if (FLAGS_injest_error_severity == 2) {
// Ingest the fatal error
} else if (FLAGS_inject_error_severity == 2) {
// Inject a fatal error
error_msg = IOStatus::IOError("Fatal IO Error");
error_msg.SetDataLoss(true);
}
Expand Down Expand Up @@ -2684,14 +2685,14 @@ void StressTest::Open(SharedState* shared, bool reopen) {
RegisterAdditionalListeners();

if (!FLAGS_use_txn) {
// Determine whether we need to ingest file metadata write failures
// Determine whether we need to inject file metadata write failures
// during DB reopen. If it does, enable it.
// Only ingest metadata error if it is reopening, as initial open
// Only inject metadata error if it is reopening, as initial open
// failure doesn't need to be handled.
// TODO cover transaction DB is not covered in this fault test too.
bool ingest_meta_error = false;
bool ingest_write_error = false;
bool ingest_read_error = false;
bool inject_meta_error = false;
bool inject_write_error = false;
bool inject_read_error = false;
if ((FLAGS_open_metadata_write_fault_one_in ||
FLAGS_open_write_fault_one_in || FLAGS_open_read_fault_one_in) &&
fault_fs_guard
Expand All @@ -2704,23 +2705,23 @@ void StressTest::Open(SharedState* shared, bool reopen) {
// solve it, skip WAL from failure injection.
fault_fs_guard->SetSkipDirectWritableTypes({kWalFile});
}
ingest_meta_error = FLAGS_open_metadata_write_fault_one_in;
ingest_write_error = FLAGS_open_write_fault_one_in;
ingest_read_error = FLAGS_open_read_fault_one_in;
if (ingest_meta_error) {
inject_meta_error = FLAGS_open_metadata_write_fault_one_in;
inject_write_error = FLAGS_open_write_fault_one_in;
inject_read_error = FLAGS_open_read_fault_one_in;
if (inject_meta_error) {
fault_fs_guard->EnableMetadataWriteErrorInjection();
fault_fs_guard->SetRandomMetadataWriteError(
FLAGS_open_metadata_write_fault_one_in);
}
if (ingest_write_error) {
if (inject_write_error) {
fault_fs_guard->SetFilesystemDirectWritable(false);
fault_fs_guard->EnableWriteErrorInjection();
fault_fs_guard->SetRandomWriteError(
static_cast<uint32_t>(FLAGS_seed), FLAGS_open_write_fault_one_in,
IOStatus::IOError("Injected Open Error"),
/*inject_for_all_file_types=*/true, /*types=*/{});
}
if (ingest_read_error) {
if (inject_read_error) {
fault_fs_guard->SetRandomReadError(FLAGS_open_read_fault_one_in);
}
}
Expand Down Expand Up @@ -2752,14 +2753,14 @@ void StressTest::Open(SharedState* shared, bool reopen) {
}
}

if (ingest_meta_error || ingest_write_error || ingest_read_error) {
if (inject_meta_error || inject_write_error || inject_read_error) {
fault_fs_guard->SetFilesystemDirectWritable(true);
fault_fs_guard->DisableMetadataWriteErrorInjection();
fault_fs_guard->DisableWriteErrorInjection();
fault_fs_guard->SetSkipDirectWritableTypes({});
fault_fs_guard->SetRandomReadError(0);
if (s.ok()) {
// Ingested errors might happen in background compactions. We
// Injected errors might happen in background compactions. We
// wait for all compactions to finish to make sure DB is in
// clean state before executing queries.
s = db_->GetRootDB()->WaitForCompact(WaitForCompactOptions());
Expand All @@ -2776,9 +2777,9 @@ void StressTest::Open(SharedState* shared, bool reopen) {
// After failure to opening a DB due to IO error, retry should
// successfully open the DB with correct data if no IO error shows
// up.
ingest_meta_error = false;
ingest_write_error = false;
ingest_read_error = false;
inject_meta_error = false;
inject_write_error = false;
inject_read_error = false;

// TODO: Unsynced data loss during DB reopen is not supported yet in
// stress test. Will need to recreate expected state if we decide
Expand Down
8 changes: 4 additions & 4 deletions db_stress_tool/no_batched_ops_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1312,7 +1312,7 @@ class NonBatchedOpsStressTest : public StressTest {
pending_expected_value.Commit();

if (!s.ok()) {
if (FLAGS_injest_error_severity >= 2) {
if (FLAGS_inject_error_severity >= 2) {
if (!is_db_stopped_ && s.severity() >= Status::Severity::kFatalError) {
is_db_stopped_ = true;
} else if (!is_db_stopped_ ||
Expand Down Expand Up @@ -1371,7 +1371,7 @@ class NonBatchedOpsStressTest : public StressTest {

thread->stats.AddDeletes(1);
if (!s.ok()) {
if (FLAGS_injest_error_severity >= 2) {
if (FLAGS_inject_error_severity >= 2) {
if (!is_db_stopped_ &&
s.severity() >= Status::Severity::kFatalError) {
is_db_stopped_ = true;
Expand Down Expand Up @@ -1402,7 +1402,7 @@ class NonBatchedOpsStressTest : public StressTest {
pending_expected_value.Commit();
thread->stats.AddSingleDeletes(1);
if (!s.ok()) {
if (FLAGS_injest_error_severity >= 2) {
if (FLAGS_inject_error_severity >= 2) {
if (!is_db_stopped_ &&
s.severity() >= Status::Severity::kFatalError) {
is_db_stopped_ = true;
Expand Down Expand Up @@ -1464,7 +1464,7 @@ class NonBatchedOpsStressTest : public StressTest {
s = db_->DeleteRange(write_opts, cfh, key, end_key);
}
if (!s.ok()) {
if (FLAGS_injest_error_severity >= 2) {
if (FLAGS_inject_error_severity >= 2) {
if (!is_db_stopped_ && s.severity() >= Status::Severity::kFatalError) {
is_db_stopped_ = true;
} else if (!is_db_stopped_ ||
Expand Down
5 changes: 2 additions & 3 deletions tools/db_crashtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,8 @@
"sync": lambda: random.choice([1 if t == 0 else 0 for t in range(0, 20)]),
"bytes_per_sync": lambda: random.choice([0, 262144]),
"wal_bytes_per_sync": lambda: random.choice([0, 524288]),
# Disable compaction_readahead_size because the test is not passing.
# "compaction_readahead_size" : lambda : random.choice(
# [0, 0, 1024 * 1024]),
"compaction_readahead_size" : lambda : random.choice(
[0, 0, 1024 * 1024]),
"db_write_buffer_size": lambda: random.choice(
[0, 0, 0, 1024 * 1024, 8 * 1024 * 1024, 128 * 1024 * 1024]
),
Expand Down
10 changes: 7 additions & 3 deletions utilities/fault_injection_fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -956,6 +956,7 @@ IOStatus FaultInjectionTestFS::InjectThreadSpecificReadError(
return IOStatus::OK();
}

IOStatus ret;
if (ctx->rand.OneIn(ctx->one_in)) {
if (ctx->count == 0) {
ctx->message = "";
Expand All @@ -972,7 +973,7 @@ IOStatus FaultInjectionTestFS::InjectThreadSpecificReadError(
// Likely non-per read status code for MultiRead
ctx->message += "error; ";
ret_fault_injected = true;
return IOStatus::IOError();
ret = IOStatus::IOError();
} else if (Random::GetTLSInstance()->OneIn(8)) {
assert(result);
// For a small chance, set the failure to status but turn the
Expand Down Expand Up @@ -1000,10 +1001,13 @@ IOStatus FaultInjectionTestFS::InjectThreadSpecificReadError(
} else {
ctx->message += "error result multiget single; ";
ret_fault_injected = true;
return IOStatus::IOError();
ret = IOStatus::IOError();
}
}
return IOStatus::OK();
if (ctx->retryable) {
ret.SetRetryable(true);
}
return ret;
}

bool FaultInjectionTestFS::TryParseFileName(const std::string& file_name,
Expand Down
8 changes: 6 additions & 2 deletions utilities/fault_injection_fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,8 @@ class FaultInjectionTestFS : public FileSystemWrapper {
// seed is the seed for the random number generator, and one_in determines
// the probability of injecting error (i.e an error is injected with
// 1/one_in probability)
void SetThreadLocalReadErrorContext(uint32_t seed, int one_in) {
void SetThreadLocalReadErrorContext(uint32_t seed, int one_in,
bool retryable) {
struct ErrorContext* ctx =
static_cast<struct ErrorContext*>(thread_local_error_->Get());
if (ctx == nullptr) {
Expand All @@ -411,6 +412,7 @@ class FaultInjectionTestFS : public FileSystemWrapper {
}
ctx->one_in = one_in;
ctx->count = 0;
ctx->retryable = retryable;
}

static void DeleteThreadLocalErrorContext(void* p) {
Expand Down Expand Up @@ -556,12 +558,14 @@ class FaultInjectionTestFS : public FileSystemWrapper {
std::string message;
int frames;
ErrorType type;
bool retryable;

explicit ErrorContext(uint32_t seed)
: rand(seed),
enable_error_injection(false),
callstack(nullptr),
frames(0) {}
frames(0),
retryable(false) {}
~ErrorContext() {
if (callstack) {
free(callstack);
Expand Down

0 comments on commit 392d695

Please sign in to comment.