forked from facebook/rocksdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
db_flush_test.cc
125 lines (110 loc) · 4.28 KB
/
db_flush_test.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "util/fault_injection_test_env.h"
#include "util/sync_point.h"
namespace rocksdb {
class DBFlushTest : public DBTestBase {
public:
DBFlushTest() : DBTestBase("/db_flush_test") {}
};
class DBFlushDirectIOTest : public DBFlushTest,
public ::testing::WithParamInterface<bool> {
public:
DBFlushDirectIOTest() : DBFlushTest() {}
};
// We had issue when two background threads trying to flush at the same time,
// only one of them get committed. The test verifies the issue is fixed.
TEST_F(DBFlushTest, FlushWhileWritingManifest) {
Options options;
options.disable_auto_compactions = true;
options.max_background_flushes = 2;
options.env = env_;
Reopen(options);
FlushOptions no_wait;
no_wait.wait = false;
SyncPoint::GetInstance()->LoadDependency(
{{"VersionSet::LogAndApply:WriteManifest",
"DBFlushTest::FlushWhileWritingManifest:1"},
{"MemTableList::InstallMemtableFlushResults:InProgress",
"VersionSet::LogAndApply:WriteManifestDone"}});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put("foo", "v"));
ASSERT_OK(dbfull()->Flush(no_wait));
TEST_SYNC_POINT("DBFlushTest::FlushWhileWritingManifest:1");
ASSERT_OK(Put("bar", "v"));
ASSERT_OK(dbfull()->Flush(no_wait));
// If the issue is hit we will wait here forever.
dbfull()->TEST_WaitForFlushMemTable();
#ifndef ROCKSDB_LITE
ASSERT_EQ(2, TotalTableFiles());
#endif // ROCKSDB_LITE
}
TEST_F(DBFlushTest, SyncFail) {
std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
new FaultInjectionTestEnv(env_));
Options options;
options.disable_auto_compactions = true;
options.env = fault_injection_env.get();
SyncPoint::GetInstance()->LoadDependency(
{{"DBFlushTest::SyncFail:1", "DBImpl::SyncClosedLogs:Start"},
{"DBImpl::SyncClosedLogs:Failed", "DBFlushTest::SyncFail:2"}});
SyncPoint::GetInstance()->EnableProcessing();
Reopen(options);
Put("key", "value");
auto* cfd =
reinterpret_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())
->cfd();
int refs_before = cfd->current()->TEST_refs();
FlushOptions flush_options;
flush_options.wait = false;
ASSERT_OK(dbfull()->Flush(flush_options));
fault_injection_env->SetFilesystemActive(false);
TEST_SYNC_POINT("DBFlushTest::SyncFail:1");
TEST_SYNC_POINT("DBFlushTest::SyncFail:2");
fault_injection_env->SetFilesystemActive(true);
dbfull()->TEST_WaitForFlushMemTable();
#ifndef ROCKSDB_LITE
ASSERT_EQ("", FilesPerLevel()); // flush failed.
#endif // ROCKSDB_LITE
// Flush job should release ref count to current version.
ASSERT_EQ(refs_before, cfd->current()->TEST_refs());
Destroy(options);
}
TEST_P(DBFlushDirectIOTest, DirectIO) {
Options options;
options.create_if_missing = true;
options.disable_auto_compactions = true;
options.max_background_flushes = 2;
options.use_direct_io_for_flush_and_compaction = GetParam();
options.env = new MockEnv(Env::Default());
SyncPoint::GetInstance()->SetCallBack(
"BuildTable:create_file", [&](void* arg) {
bool* use_direct_writes = static_cast<bool*>(arg);
ASSERT_EQ(*use_direct_writes,
options.use_direct_io_for_flush_and_compaction);
});
SyncPoint::GetInstance()->EnableProcessing();
Reopen(options);
ASSERT_OK(Put("foo", "v"));
FlushOptions flush_options;
flush_options.wait = true;
ASSERT_OK(dbfull()->Flush(flush_options));
Destroy(options);
delete options.env;
}
INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest,
testing::Bool());
} // namespace rocksdb
int main(int argc, char** argv) {
rocksdb::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}