From 8a9cfd52924c58e74935a604f89bd87f318b4ac3 Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Tue, 10 Oct 2023 06:29:01 -0700 Subject: [PATCH] Make stopped writes block on recovery (#11879) Summary: Relaxed the constraints for blocking when writes are stopped. When a recovery is already being attempted, we might as well let `!no_slowdown` writes wait on it in case it succeeds. This makes the user-visible behavior consistent across recovery flush and non-recovery flush. This enables `db_stress` to inject retryable (soft) flush read errors without having to handle user write failures. I changed `db_stress` a bit to permit injected errors in much more foreground operations as more admin operations (like `GetLiveFiles()`) can fail on a retryable error during flush. Pull Request resolved: https://github.com/facebook/rocksdb/pull/11879 Reviewed By: anand1976 Differential Revision: D49571196 Pulled By: ajkr fbshipit-source-id: 5d516d6faf20d2c6bfe0594ab4f2706bca6d69b0 --- db/db_impl/db_impl_write.cc | 10 +-- db_stress_tool/db_stress_listener.h | 12 +++ db_stress_tool/db_stress_test_base.cc | 74 +++++++------------ db_stress_tool/db_stress_test_base.h | 4 +- db_stress_tool/multi_ops_txns_stress.cc | 25 ++++--- .../stopped_writes_wait_for_recovery.md | 1 + 6 files changed, 61 insertions(+), 65 deletions(-) create mode 100644 unreleased_history/behavior_changes/stopped_writes_wait_for_recovery.md diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index bc260c5a8fe..505a37883a9 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1858,11 +1858,11 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, WriteThread& write_thread, write_thread.EndWriteStall(); } - // Don't wait if there's a background error, even if its a soft error. We - // might wait here indefinitely as the background compaction may never - // finish successfully, resulting in the stall condition lasting - // indefinitely - while (error_handler_.GetBGError().ok() && write_controller_.IsStopped() && + // Don't wait if there's a background error that is not pending recovery + // since recovery might never be attempted. + while ((error_handler_.GetBGError().ok() || + error_handler_.IsRecoveryInProgress()) && + write_controller_.IsStopped() && !shutting_down_.load(std::memory_order_relaxed)) { if (write_options.no_slowdown) { return Status::Incomplete("Write stall"); diff --git a/db_stress_tool/db_stress_listener.h b/db_stress_tool/db_stress_listener.h index aba95d4c0b4..505b0a604df 100644 --- a/db_stress_tool/db_stress_listener.h +++ b/db_stress_tool/db_stress_listener.h @@ -71,11 +71,23 @@ class DbStressListener : public EventListener { VerifyFilePath(info.file_path); // pretending doing some work here RandomSleep(); + if (FLAGS_read_fault_one_in) { + (void)fault_fs_guard->GetAndResetErrorCount(); + fault_fs_guard->DisableErrorInjection(); + } } void OnFlushBegin(DB* /*db*/, const FlushJobInfo& /*flush_job_info*/) override { RandomSleep(); + if (FLAGS_read_fault_one_in) { + // Hardcoded to inject retryable error as a non-retryable error would put + // the DB in read-only mode and then it would crash on the next write. + fault_fs_guard->SetThreadLocalReadErrorContext( + static_cast(FLAGS_seed), FLAGS_read_fault_one_in, + true /* retryable */); + fault_fs_guard->EnableErrorInjection(); + } } void OnTableFileDeleted(const TableFileDeletionInfo& /*info*/) override { diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index f7dee86b28b..5b843eb5df7 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -415,10 +415,22 @@ Status StressTest::AssertSame(DB* db, ColumnFamilyHandle* cf, return Status::OK(); } -void StressTest::VerificationAbort(SharedState* shared, std::string msg, - Status s) const { - fprintf(stderr, "Verification failed: %s. Status is %s\n", msg.c_str(), - s.ToString().c_str()); +void StressTest::ProcessStatus(SharedState* shared, std::string opname, + Status s) const { + if (s.ok()) { + return; + } + if (!s.IsIOError() || !std::strstr(s.getState(), "injected")) { + std::ostringstream oss; + oss << opname << " failed: " << s.ToString(); + VerificationAbort(shared, oss.str()); + assert(false); + } + fprintf(stdout, "%s failed: %s\n", opname.c_str(), s.ToString().c_str()); +} + +void StressTest::VerificationAbort(SharedState* shared, std::string msg) const { + fprintf(stderr, "Verification failed: %s\n", msg.c_str()); shared->SetVerificationFailure(); } @@ -910,35 +922,24 @@ void StressTest::OperateDb(ThreadState* thread) { if (thread->rand.OneInOpt(FLAGS_get_live_files_one_in) && !FLAGS_write_fault_one_in) { Status status = VerifyGetLiveFiles(); - if (!status.ok()) { - VerificationAbort(shared, "VerifyGetLiveFiles status not OK", status); - } + ProcessStatus(shared, "VerifyGetLiveFiles", status); } // Verify GetSortedWalFiles with a 1 in N chance. if (thread->rand.OneInOpt(FLAGS_get_sorted_wal_files_one_in)) { Status status = VerifyGetSortedWalFiles(); - if (!status.ok()) { - VerificationAbort(shared, "VerifyGetSortedWalFiles status not OK", - status); - } + ProcessStatus(shared, "VerifyGetSortedWalFiles", status); } // Verify GetCurrentWalFile with a 1 in N chance. if (thread->rand.OneInOpt(FLAGS_get_current_wal_file_one_in)) { Status status = VerifyGetCurrentWalFile(); - if (!status.ok()) { - VerificationAbort(shared, "VerifyGetCurrentWalFile status not OK", - status); - } + ProcessStatus(shared, "VerifyGetCurrentWalFile", status); } if (thread->rand.OneInOpt(FLAGS_pause_background_one_in)) { Status status = TestPauseBackground(thread); - if (!status.ok()) { - VerificationAbort( - shared, "Pause/ContinueBackgroundWork status not OK", status); - } + ProcessStatus(shared, "Pause/ContinueBackgroundWork", status); } if (thread->rand.OneInOpt(FLAGS_verify_checksum_one_in)) { @@ -947,9 +948,7 @@ void StressTest::OperateDb(ThreadState* thread) { ThreadStatus::OperationType::OP_VERIFY_DB_CHECKSUM); Status status = db_->VerifyChecksum(); ThreadStatusUtil::ResetThreadStatus(); - if (!status.ok()) { - VerificationAbort(shared, "VerifyChecksum status not OK", status); - } + ProcessStatus(shared, "VerifyChecksum", status); } if (thread->rand.OneInOpt(FLAGS_verify_file_checksums_one_in)) { @@ -958,10 +957,7 @@ void StressTest::OperateDb(ThreadState* thread) { ThreadStatus::OperationType::OP_VERIFY_FILE_CHECKSUMS); Status status = db_->VerifyFileChecksums(read_opts); ThreadStatusUtil::ResetThreadStatus(); - if (!status.ok()) { - VerificationAbort(shared, "VerifyFileChecksums status not OK", - status); - } + ProcessStatus(shared, "VerifyFileChecksums", status); } if (thread->rand.OneInOpt(FLAGS_get_property_one_in)) { @@ -988,35 +984,19 @@ void StressTest::OperateDb(ThreadState* thread) { if (total_size <= FLAGS_backup_max_size) { Status s = TestBackupRestore(thread, rand_column_families, rand_keys); - if (!s.ok()) { - if (!s.IsIOError() || !std::strstr(s.getState(), "injected")) { - VerificationAbort(shared, - "Backup/restore gave inconsistent state", s); - } else { - fprintf(stdout, "Backup/restore failed: %s\n", - s.ToString().c_str()); - } - } + ProcessStatus(shared, "Backup/restore", s); } } if (thread->rand.OneInOpt(FLAGS_checkpoint_one_in)) { Status s = TestCheckpoint(thread, rand_column_families, rand_keys); - if (!s.ok()) { - if (!s.IsIOError() || !std::strstr(s.getState(), "injected")) { - VerificationAbort(shared, "Checkpoint gave inconsistent state", s); - } else { - fprintf(stdout, "Checkpoint failed: %s\n", s.ToString().c_str()); - } - } + ProcessStatus(shared, "Checkpoint", s); } if (thread->rand.OneInOpt(FLAGS_approximate_size_one_in)) { Status s = TestApproximateSize(thread, i, rand_column_families, rand_keys); - if (!s.ok()) { - VerificationAbort(shared, "ApproximateSize Failed", s); - } + ProcessStatus(shared, "ApproximateSize", s); } if (thread->rand.OneInOpt(FLAGS_acquire_snapshot_one_in)) { TestAcquireSnapshot(thread, rand_column_family, keystr, i); @@ -1024,9 +1004,7 @@ void StressTest::OperateDb(ThreadState* thread) { /*always*/ { Status s = MaybeReleaseSnapshots(thread, i); - if (!s.ok()) { - VerificationAbort(shared, "Snapshot gave inconsistent state", s); - } + ProcessStatus(shared, "Snapshot", s); } // Assign timestamps if necessary. diff --git a/db_stress_tool/db_stress_test_base.h b/db_stress_tool/db_stress_test_base.h index 3008f0366a0..fad4926aa42 100644 --- a/db_stress_tool/db_stress_test_base.h +++ b/db_stress_tool/db_stress_test_base.h @@ -224,7 +224,9 @@ class StressTest { return Status::NotSupported("TestCustomOperations() must be overridden"); } - void VerificationAbort(SharedState* shared, std::string msg, Status s) const; + void ProcessStatus(SharedState* shared, std::string msg, Status s) const; + + void VerificationAbort(SharedState* shared, std::string msg) const; void VerificationAbort(SharedState* shared, std::string msg, int cf, int64_t key) const; diff --git a/db_stress_tool/multi_ops_txns_stress.cc b/db_stress_tool/multi_ops_txns_stress.cc index 1591a52e998..c7d38339bb2 100644 --- a/db_stress_tool/multi_ops_txns_stress.cc +++ b/db_stress_tool/multi_ops_txns_stress.cc @@ -1104,8 +1104,9 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const { Status s = record.DecodePrimaryIndexEntry(it->key(), it->value()); if (!s.ok()) { oss << "Cannot decode primary index entry " << it->key().ToString(true) - << "=>" << it->value().ToString(true); - VerificationAbort(thread->shared, oss.str(), s); + << "=>" << it->value().ToString(true) << ". Status is " + << s.ToString(); + VerificationAbort(thread->shared, oss.str()); assert(false); return; } @@ -1125,8 +1126,9 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const { std::string value; s = db_->Get(ropts, sk, &value); if (!s.ok()) { - oss << "Cannot find secondary index entry " << sk.ToString(true); - VerificationAbort(thread->shared, oss.str(), s); + oss << "Cannot find secondary index entry " << sk.ToString(true) + << ". Status is " << s.ToString(); + VerificationAbort(thread->shared, oss.str()); assert(false); return; } @@ -1153,8 +1155,9 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const { Status s = record.DecodeSecondaryIndexEntry(it->key(), it->value()); if (!s.ok()) { oss << "Cannot decode secondary index entry " - << it->key().ToString(true) << "=>" << it->value().ToString(true); - VerificationAbort(thread->shared, oss.str(), s); + << it->key().ToString(true) << "=>" << it->value().ToString(true) + << ". Status is " << s.ToString(); + VerificationAbort(thread->shared, oss.str()); assert(false); return; } @@ -1168,7 +1171,7 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const { if (!s.ok()) { oss << "Error searching pk " << Slice(pk).ToString(true) << ". " << s.ToString() << ". sk " << it->key().ToString(true); - VerificationAbort(thread->shared, oss.str(), s); + VerificationAbort(thread->shared, oss.str()); assert(false); return; } @@ -1176,8 +1179,8 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const { s = std::get<0>(result); if (!s.ok()) { oss << "Error decoding primary index value " - << Slice(value).ToString(true) << ". " << s.ToString(); - VerificationAbort(thread->shared, oss.str(), s); + << Slice(value).ToString(true) << ". Status is " << s.ToString(); + VerificationAbort(thread->shared, oss.str()); assert(false); return; } @@ -1187,7 +1190,7 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const { << Slice(value).ToString(true) << " (a=" << record.a_value() << ", c=" << c_in_primary << "), sk: " << it->key().ToString(true) << " (c=" << record.c_value() << ")"; - VerificationAbort(thread->shared, oss.str(), s); + VerificationAbort(thread->shared, oss.str()); assert(false); return; } @@ -1198,7 +1201,7 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const { oss << "Pk/sk mismatch: primary index has " << primary_index_entries_count << " entries. Secondary index has " << secondary_index_entries_count << " entries."; - VerificationAbort(thread->shared, oss.str(), Status::OK()); + VerificationAbort(thread->shared, oss.str()); assert(false); return; } diff --git a/unreleased_history/behavior_changes/stopped_writes_wait_for_recovery.md b/unreleased_history/behavior_changes/stopped_writes_wait_for_recovery.md new file mode 100644 index 00000000000..2c44d55721f --- /dev/null +++ b/unreleased_history/behavior_changes/stopped_writes_wait_for_recovery.md @@ -0,0 +1 @@ +* During a write stop, writes now block on in-progress recovery attempts