Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto refresh iterator with snapshot #13354

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Prev Previous commit
Next Next commit
Hook new feature into db_stress
  • Loading branch information
mszeszko-meta committed Feb 10, 2025
commit 2a64725d8ef36330b3f8c9c7594a14fa170b4bc7
10 changes: 9 additions & 1 deletion db_stress_tool/cf_consistency_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,11 @@ class CfConsistencyStressTest : public StressTest {
Slice ub_slice;

ReadOptions ro_copy = readoptions;
std::unique_ptr<ManagedSnapshot> snapshot = nullptr;
if (ro_copy.auto_refresh_iterator_with_snapshot) {
snapshot = std::make_unique<ManagedSnapshot>(db_);
ro_copy.snapshot = snapshot->snapshot();
}

// Get the next prefix first and then see if we want to set upper bound.
// We'll use the next prefix in an assertion later on
Expand Down Expand Up @@ -858,6 +863,8 @@ class CfConsistencyStressTest : public StressTest {

ManagedSnapshot snapshot_guard(db_);
options.snapshot = snapshot_guard.snapshot();
options.auto_refresh_iterator_with_snapshot =
FLAGS_auto_refresh_iterator_with_snapshot;

const size_t num = column_families_.size();

Expand Down Expand Up @@ -1083,8 +1090,9 @@ class CfConsistencyStressTest : public StressTest {
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
ReadOptions ropts(FLAGS_verify_checksum, true);
ropts.total_order_seek = true;
if (nullptr == secondary_db_) {
if (nullptr == secondary_db_ || FLAGS_auto_refresh_iterator_with_snapshot) {
ropts.snapshot = snapshot_guard.snapshot();
ropts.auto_refresh_iterator_with_snapshot = true;
}
uint32_t crc = 0;
{
Expand Down
1 change: 1 addition & 0 deletions db_stress_tool/db_stress_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ DECLARE_string(file_temperature_age_thresholds);
DECLARE_uint32(commit_bypass_memtable_one_in);
DECLARE_bool(track_and_verify_wals);
DECLARE_bool(enable_remote_compaction);
DECLARE_bool(auto_refresh_iterator_with_snapshot);

constexpr long KB = 1024;
constexpr int kRandomValueMaxFactor = 3;
Expand Down
6 changes: 6 additions & 0 deletions db_stress_tool/db_stress_gflags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1479,4 +1479,10 @@ DEFINE_bool(paranoid_memory_checks,
DEFINE_uint32(commit_bypass_memtable_one_in, 0,
"If greater than zero, transaction option will set "
"commit_bypass_memtable to per every N transactions on average.");

DEFINE_bool(
auto_refresh_iterator_with_snapshot,
ROCKSDB_NAMESPACE::ReadOptions().auto_refresh_iterator_with_snapshot,
"ReadOptions.auto_refresh_iterator_with_snapshot");

#endif // GFLAGS
15 changes: 14 additions & 1 deletion db_stress_tool/db_stress_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,8 @@ Status StressTest::AssertSame(DB* db, ColumnFamilyHandle* cf,
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
ReadOptions ropt;
ropt.snapshot = snap_state.snapshot;
ropt.auto_refresh_iterator_with_snapshot =
FLAGS_auto_refresh_iterator_with_snapshot;
Slice ts;
if (!snap_state.timestamp.empty()) {
ts = snap_state.timestamp;
Expand Down Expand Up @@ -954,6 +956,8 @@ void StressTest::OperateDb(ThreadState* thread) {
read_opts.fill_cache = FLAGS_fill_cache;
read_opts.optimize_multiget_for_io = FLAGS_optimize_multiget_for_io;
read_opts.allow_unprepared_value = FLAGS_allow_unprepared_value;
read_opts.auto_refresh_iterator_with_snapshot =
FLAGS_auto_refresh_iterator_with_snapshot;

WriteOptions write_opts;
if (FLAGS_rate_limit_auto_wal_flush) {
Expand Down Expand Up @@ -1734,6 +1738,8 @@ Status StressTest::TestIterateImpl(ThreadState* thread,
cmp_ro.timestamp = ro.timestamp;
cmp_ro.iter_start_ts = ro.iter_start_ts;
cmp_ro.snapshot = snapshot_guard.snapshot();
cmp_ro.auto_refresh_iterator_with_snapshot =
ro.auto_refresh_iterator_with_snapshot;
cmp_ro.total_order_seek = true;

ColumnFamilyHandle* const cmp_cfh =
Expand Down Expand Up @@ -1962,7 +1968,10 @@ void StressTest::VerifyIterator(
<< (ro.iterate_lower_bound
? ro.iterate_lower_bound->ToString(true).c_str()
: "")
<< ", allow_unprepared_value: " << ro.allow_unprepared_value;
<< ", allow_unprepared_value: " << ro.allow_unprepared_value
<< ", auto_refresh_iterator_with_snapshot"
<< ro.auto_refresh_iterator_with_snapshot << ", snapshot: "
<< ((ro.snapshot == nullptr) ? "nullptr" : "well-defined");

if (iter->Valid() && !cmp_iter->Valid()) {
if (pe != nullptr) {
Expand Down Expand Up @@ -2918,6 +2927,8 @@ void StressTest::TestAcquireSnapshot(ThreadState* thread,
ww_snapshot ? db_impl->GetSnapshotForWriteConflictBoundary()
: db_->GetSnapshot();
ropt.snapshot = snapshot;
ropt.auto_refresh_iterator_with_snapshot =
FLAGS_auto_refresh_iterator_with_snapshot;

// Ideally, we want snapshot taking and timestamp generation to be atomic
// here, so that the snapshot corresponds to the timestamp. However, it is
Expand Down Expand Up @@ -3127,6 +3138,8 @@ uint32_t StressTest::GetRangeHash(ThreadState* thread, const Snapshot* snapshot,
ReadOptions ro;
ro.snapshot = snapshot;
ro.total_order_seek = true;
ro.auto_refresh_iterator_with_snapshot =
FLAGS_auto_refresh_iterator_with_snapshot;
std::string ts_str;
Slice ts;
if (FLAGS_user_timestamp_size > 0) {
Expand Down
24 changes: 23 additions & 1 deletion db_stress_tool/multi_ops_txns_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,14 @@ void MultiOpsTxnsStressTest::ReopenAndPreloadDbIfNeeded(SharedState* shared) {
(void)shared;
bool db_empty = false;
{
std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
ReadOptions ropt;
std::unique_ptr<ManagedSnapshot> snapshot = nullptr;
if (FLAGS_auto_refresh_iterator_with_snapshot) {
snapshot = std::make_unique<ManagedSnapshot>(db_);
ropt.snapshot = snapshot->snapshot();
ropt.auto_refresh_iterator_with_snapshot = true;
}
std::unique_ptr<Iterator> iter(db_->NewIterator(ropt));
iter->SeekToFirst();
if (!iter->Valid()) {
db_empty = true;
Expand Down Expand Up @@ -767,6 +774,8 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread,
Slice iter_ub = iter_ub_str;
ReadOptions ropts;
ropts.snapshot = txn->GetSnapshot();
ropts.auto_refresh_iterator_with_snapshot =
FLAGS_auto_refresh_iterator_with_snapshot;
ropts.total_order_seek = true;
ropts.iterate_upper_bound = &iter_ub;
ropts.rate_limiter_priority =
Expand Down Expand Up @@ -1136,6 +1145,8 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const {
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
ReadOptions ropts;
ropts.snapshot = snapshot;
ropts.auto_refresh_iterator_with_snapshot =
FLAGS_auto_refresh_iterator_with_snapshot;
ropts.total_order_seek = true;
ropts.iterate_upper_bound = &iter_ub;
if (FLAGS_use_sqfc_for_range_queries) {
Expand Down Expand Up @@ -1191,6 +1202,8 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const {
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
ReadOptions ropts;
ropts.snapshot = snapshot;
ropts.auto_refresh_iterator_with_snapshot =
FLAGS_auto_refresh_iterator_with_snapshot;
ropts.total_order_seek = true;

std::unique_ptr<Iterator> it(db_->NewIterator(ropts));
Expand Down Expand Up @@ -1289,6 +1302,8 @@ void MultiOpsTxnsStressTest::VerifyPkSkFast(const ReadOptions& read_options,
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
ReadOptions ropts;
ropts.snapshot = snapshot;
ropts.auto_refresh_iterator_with_snapshot =
FLAGS_auto_refresh_iterator_with_snapshot;
ropts.total_order_seek = true;
ropts.io_activity = read_options.io_activity;

Expand Down Expand Up @@ -1643,6 +1658,13 @@ void MultiOpsTxnsStressTest::ScanExistingDb(SharedState* shared, int threads) {

assert(db_);
ReadOptions ropts;
std::unique_ptr<ManagedSnapshot> snapshot = nullptr;
if (FLAGS_auto_refresh_iterator_with_snapshot) {
snapshot = std::make_unique<ManagedSnapshot>(db_);
ropts.snapshot = snapshot->snapshot();
ropts.auto_refresh_iterator_with_snapshot = true;
}

std::vector<KeySet> existing_a_uniqs(threads);
std::vector<KeySet> non_existing_a_uniqs(threads);
std::vector<KeySet> existing_c_uniqs(threads);
Expand Down
28 changes: 28 additions & 0 deletions db_stress_tool/no_batched_ops_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ class NonBatchedOpsStressTest : public StressTest {
// This `ReadOptions` is for validation purposes. Ignore
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
ReadOptions options(FLAGS_verify_checksum, true);
std::unique_ptr<ManagedSnapshot> snapshot = nullptr;
if (FLAGS_auto_refresh_iterator_with_snapshot) {
snapshot = std::make_unique<ManagedSnapshot>(db_);
options.snapshot = snapshot->snapshot();
options.auto_refresh_iterator_with_snapshot = true;
}

std::string ts_str;
Slice ts;
if (FLAGS_user_timestamp_size > 0) {
Expand Down Expand Up @@ -469,6 +476,13 @@ class NonBatchedOpsStressTest : public StressTest {
read_opts.timestamp = &ts;
}

std::unique_ptr<ManagedSnapshot> snapshot = nullptr;
if (FLAGS_auto_refresh_iterator_with_snapshot) {
snapshot = std::make_unique<ManagedSnapshot>(db_);
read_opts.snapshot = snapshot->snapshot();
read_opts.auto_refresh_iterator_with_snapshot = true;
}

static Random64 rand64(shared->GetSeed());

{
Expand Down Expand Up @@ -1561,6 +1575,13 @@ class NonBatchedOpsStressTest : public StressTest {
Slice ub_slice;
ReadOptions ro_copy = read_opts;

std::unique_ptr<ManagedSnapshot> snapshot = nullptr;
if (ro_copy.auto_refresh_iterator_with_snapshot) {
snapshot = std::make_unique<ManagedSnapshot>(db_);
ro_copy.snapshot = snapshot->snapshot();
ro_copy.auto_refresh_iterator_with_snapshot = true;
}

// Randomly test with `iterate_upper_bound` and `prefix_same_as_start`
//
// Get the next prefix first and then see if we want to set it to be the
Expand Down Expand Up @@ -2340,6 +2361,13 @@ class NonBatchedOpsStressTest : public StressTest {
}

ReadOptions ro(read_opts);
std::unique_ptr<ManagedSnapshot> snapshot = nullptr;
if (ro.auto_refresh_iterator_with_snapshot) {
snapshot = std::make_unique<ManagedSnapshot>(db_);
ro.snapshot = snapshot->snapshot();
ro.auto_refresh_iterator_with_snapshot = true;
}

if (FLAGS_prefix_size > 0) {
ro.total_order_seek = true;
}
Expand Down
1 change: 1 addition & 0 deletions tools/db_crashtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@
"allow_unprepared_value": lambda: random.choice([0, 1]),
"track_and_verify_wals": lambda: random.choice([0, 1]),
"enable_remote_compaction": lambda: random.choice([0, 1]),
"auto_refresh_iterator_with_snapshot": lambda: random.choice([0, 1]),
}
_TEST_DIR_ENV_VAR = "TEST_TMPDIR"
# If TEST_TMPDIR_EXPECTED is not specified, default value will be TEST_TMPDIR
Expand Down
Loading