Skip to content

Commit

Permalink
log_block_manager: fix corruption after re-opening compacted metadata
Browse files Browse the repository at this point in the history
This fixes an issue discovered on a cluster due to the following
sequence of events:

- a block manager compacts a metadata file while starting up
- when it reopens the metadata file after replacing it with the
  compacted one, it gets a file_cache hit. Thus, the WritablePBContainer
  continues to write to the _deleted_ file instead of the compacted one.
  Metadata entries at this point are lost (which could cause block loss
  in the case of lost CREATE records, or dangling blocks in the case of
  lost DELETEs)
- if the server continues to run for a while, the FD will be evicted
  from the cache and eventually re-opened. At that point, a further
  DELETE record could end up writing to an offset past the end of the
  file, since the write offset was incremented by the "lost" records
  above.
- on the next restart, the metadata file would have a "gap" of zero
  bytes, which would surface as a checksum failure and failure to start
  up.

The fix is relatively simple: when we replace the metadata file we need
to invalidate and evict the cache entry so that when we "reopen", it
actually starts appending to the _new_ file and not the old deleted one.

The bulk of the changes here are to tests:
- the stress test now enforces a minimum number of live blocks before it
  starts deleting them. It also more aggressively compacts, and has a
  smaller cache. With these changes, I was sometimes able to reproduce
  the issue.
- A more targeted test issues a canned sequence of block creations and
  deletions that can reliably reproduce the above issue.

Change-Id: I14b2c64685e24d27591258911db4aeb9e8020a4d
Reviewed-on: http://gerrit.cloudera.org:8080/7113
Reviewed-by: Adar Dembo <[email protected]>
Tested-by: Kudu Jenkins
  • Loading branch information
toddlipcon committed Jun 8, 2017
1 parent 8be2a59 commit e77538b
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 66 deletions.
45 changes: 30 additions & 15 deletions src/kudu/fs/block_manager-stress-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,13 @@ DECLARE_int64(block_manager_max_open_files);
DECLARE_uint64(log_container_max_size);
DECLARE_uint64(log_container_preallocate_bytes);

DEFINE_int32(test_duration_secs, 2, "Number of seconds to run the test");
DEFINE_double(test_duration_secs, 2, "Number of seconds to run the test");
DEFINE_int32(num_writer_threads, 4, "Number of writer threads to run");
DEFINE_int32(num_reader_threads, 8, "Number of reader threads to run");
DEFINE_int32(num_deleter_threads, 1, "Number of deleter threads to run");
DEFINE_int32(minimum_live_blocks_for_delete, 1000,
"If there are fewer than this number of live blocks, the deleter "
"threads will not delete any");
DEFINE_int32(block_group_size, 8, "Number of blocks to write per block "
"group. Must be power of 2");
DEFINE_int32(block_group_bytes, 32 * 1024,
Expand Down Expand Up @@ -104,13 +107,13 @@ class BlockManagerStressTest : public KuduTest {
FLAGS_log_container_preallocate_bytes = 1 * 1024 * 1024;

// Ensure the file cache is under stress too.
FLAGS_block_manager_max_open_files = 512;
FLAGS_block_manager_max_open_files = 32;

// Maximize the amount of cleanup triggered by the extra space heuristic.
FLAGS_log_container_excess_space_before_cleanup_fraction = 0.0;

// Compact block manager metadata aggressively.
FLAGS_log_container_live_metadata_before_compact_ratio = 0.80;
FLAGS_log_container_live_metadata_before_compact_ratio = 0.99;

if (FLAGS_block_manager_paths.empty()) {
data_dirs_.push_back(test_dir_);
Expand Down Expand Up @@ -147,7 +150,7 @@ class BlockManagerStressTest : public KuduTest {
return new T(env_, opts);
}

void RunTest(int secs) {
void RunTest(double secs) {
LOG(INFO) << "Starting all threads";
this->StartThreads();
SleepFor(MonoDelta::FromSeconds(secs));
Expand Down Expand Up @@ -389,8 +392,15 @@ void BlockManagerStressTest<T>::DeleterThread() {
// Grab a block at random.
BlockId to_delete;
{
std::lock_guard<simple_spinlock> l(lock_);
if (written_blocks_.empty()) {
std::unique_lock<simple_spinlock> l(lock_);
// If we only have a small number of live blocks, don't delete any.
// This ensures that, when we restart, we always have a reasonable
// amount of data -- otherwise the deletion threads are likely to
// "keep up" with the writer threads and every restart will consist
// of a very small number of non-dead containers.
if (written_blocks_.size() < FLAGS_minimum_live_blocks_for_delete) {
l.unlock();
SleepFor(MonoDelta::FromMilliseconds(10));
continue;
}

Expand Down Expand Up @@ -460,6 +470,8 @@ TYPED_TEST(BlockManagerStressTest, StressTest) {
OverrideFlagForSlowTests("block_group_size", "16");
OverrideFlagForSlowTests("num_inconsistencies", "128");

const int kNumStarts = 3;

if ((FLAGS_block_group_size & (FLAGS_block_group_size - 1)) != 0) {
LOG(FATAL) << "block_group_size " << FLAGS_block_group_size
<< " is not a power of 2";
Expand All @@ -469,16 +481,19 @@ TYPED_TEST(BlockManagerStressTest, StressTest) {

LOG(INFO) << "Running on fresh block manager";
checker.Start();
this->RunTest(FLAGS_test_duration_secs / 2);
this->RunTest(FLAGS_test_duration_secs / kNumStarts);
NO_FATALS(this->InjectNonFatalInconsistencies());
LOG(INFO) << "Running on populated block manager";
this->bm_.reset(this->CreateBlockManager());
FsReport report;
ASSERT_OK(this->bm_->Open(&report));
ASSERT_OK(this->bm_->dd_manager()->LoadDataDirGroupFromPB(this->test_tablet_name_,
this->test_group_pb_));
ASSERT_OK(report.LogAndCheckForFatalErrors());
this->RunTest(FLAGS_test_duration_secs / 2);

for (int i = 1; i < kNumStarts; i++) {
LOG(INFO) << "Running on populated block manager (restart #" << i << ")";
this->bm_.reset(this->CreateBlockManager());
FsReport report;
ASSERT_OK(this->bm_->Open(&report));
ASSERT_OK(this->bm_->dd_manager()->LoadDataDirGroupFromPB(this->test_tablet_name_,
this->test_group_pb_));
ASSERT_OK(report.LogAndCheckForFatalErrors());
this->RunTest(FLAGS_test_duration_secs / kNumStarts);
}
checker.Stop();

LOG(INFO) << "Printing test totals";
Expand Down
56 changes: 56 additions & 0 deletions src/kudu/fs/log_block_manager-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ using std::unordered_set;
using std::vector;
using strings::Substitute;

DECLARE_int64(block_manager_max_open_files);
DECLARE_bool(cache_force_single_shard);
DECLARE_double(log_container_excess_space_before_cleanup_fraction);
DECLARE_double(log_container_live_metadata_before_compact_ratio);
DECLARE_int64(log_container_max_blocks);
Expand Down Expand Up @@ -1198,5 +1200,59 @@ TEST_F(LogBlockManagerTest, TestCompactFullContainerMetadataAtStartup) {
ASSERT_EQ(last_live_aligned_bytes, report.stats.live_block_bytes_aligned);
}

// Regression test for a bug in which, after a metadata file was compacted,
// we would not properly handle appending to the new (post-compaction) metadata.
//
// The bug was related to a stale file descriptor left in the file_cache, so
// this test explicitly targets that scenario.
TEST_F(LogBlockManagerTest, TestDeleteFromContainerAfterMetadataCompaction) {
// Compact aggressively.
FLAGS_log_container_live_metadata_before_compact_ratio = 0.99;
// Use a small file cache (smaller than the number of containers).
FLAGS_block_manager_max_open_files = 50;
// Use a single shard so that we have an accurate max cache capacity
// regardless of the number of cores on the machine.
FLAGS_cache_force_single_shard = true;
// Use very small containers, so that we generate a lot of them (and thus
// consume a lot of file descriptors).
FLAGS_log_container_max_blocks = 4;
// Reopen so the flags take effect.
ASSERT_OK(ReopenBlockManager(nullptr));

// Create many container with a bunch of blocks, half of which are deleted.
vector<BlockId> block_ids;
for (int i = 0; i < 1000; i++) {
unique_ptr<WritableBlock> block;
ASSERT_OK(bm_->CreateBlock(test_block_opts_, &block));
ASSERT_OK(block->Close());
if (i % 2 == 1) {
ASSERT_OK(bm_->DeleteBlock(block->id()));
} else {
block_ids.emplace_back(block->id());
}
}

// Reopen the block manager. This will cause it to compact all of the metadata
// files, since we've deleted half the blocks in every container and the
// threshold is set high above.
FsReport report;
ASSERT_OK(ReopenBlockManager(&report));

// Delete the remaining blocks in a random order. This will append to metadata
// files which have just been compacted. Since we have more metadata files than
// we have file_cache capacity, this will also generate a mix of cache hits,
// misses, and re-insertions.
std::random_shuffle(block_ids.begin(), block_ids.end());
for (const BlockId& b : block_ids) {
ASSERT_OK(bm_->DeleteBlock(b));
}

// Reopen to make sure that the metadata can be properly loaded and
// that the resulting block manager is empty.
ASSERT_OK(ReopenBlockManager(&report));
ASSERT_EQ(0, report.stats.live_block_count);
ASSERT_EQ(0, report.stats.live_block_bytes_aligned);
}

} // namespace fs
} // namespace kudu
12 changes: 9 additions & 3 deletions src/kudu/fs/log_block_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2255,9 +2255,8 @@ Status LogBlockManager::Repair(

// Rewrite this metadata file. Failures are non-fatal.
int64_t file_bytes_delta;
Status s = RewriteMetadataFile(StrCat(e.first, kContainerMetadataFileSuffix),
e.second,
&file_bytes_delta);
const auto& meta_path = StrCat(e.first, kContainerMetadataFileSuffix);
Status s = RewriteMetadataFile(meta_path, e.second, &file_bytes_delta);
if (!s.ok()) {
WARN_NOT_OK(s, "could not rewrite metadata file");
continue;
Expand All @@ -2269,6 +2268,9 @@ Status LogBlockManager::Repair(

metadata_files_compacted++;
metadata_bytes_delta += file_bytes_delta;
VLOG(1) << "Compacted metadata file " << meta_path
<< " (saved " << file_bytes_delta << " bytes)";

}

// The data directory can be synchronized once for all of the new metadata files.
Expand Down Expand Up @@ -2322,6 +2324,10 @@ Status LogBlockManager::RewriteMetadataFile(const string& metadata_file_name,
"could not get file size of temporary metadata file");
RETURN_NOT_OK_PREPEND(env_->RenameFile(tmp_file_name, metadata_file_name),
"could not rename temporary metadata file");
// Evict the old path from the file cache, so that when we re-open the new
// metadata file for write, we don't accidentally get a cache hit on the
// old file descriptor pointing to the now-deleted old version.
file_cache_.Invalidate(metadata_file_name);

tmp_deleter.Cancel();
*file_bytes_delta = (static_cast<int64_t>(old_metadata_size) - new_metadata_size);
Expand Down
34 changes: 34 additions & 0 deletions src/kudu/util/file_cache-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,40 @@ TYPED_TEST(FileCacheTest, TestDeletion) {
ASSERT_EQ(this->initial_open_fds_, CountOpenFds(this->env_));
}

TYPED_TEST(FileCacheTest, TestInvalidation) {
const string kFile1 = this->GetTestPath("foo");
const string kData1 = "test data 1";
ASSERT_OK(this->WriteTestFile(kFile1, kData1));

// Open the file.
shared_ptr<TypeParam> f;
ASSERT_OK(this->cache_->OpenExistingFile(kFile1, &f));

// Write a new file and rename it in place on top of file1.
const string kFile2 = this->GetTestPath("foo2");
const string kData2 = "test data 2 (longer than original)";
ASSERT_OK(this->WriteTestFile(kFile2, kData2));
ASSERT_OK(this->env_->RenameFile(kFile2, kFile1));

// We should still be able to access the file, since it has a cached fd.
uint64_t size;
ASSERT_OK(f->Size(&size));
ASSERT_EQ(kData1.size(), size);

// If we invalidate it from the cache and try again, it should crash because
// the existing descriptor was invalidated.
this->cache_->Invalidate(kFile1);
ASSERT_DEATH({ f->Size(&size); }, "invalidated");

// But if we re-open the path again, the new descriptor should read the
// new data.
shared_ptr<TypeParam> f2;
ASSERT_OK(this->cache_->OpenExistingFile(kFile1, &f2));
ASSERT_OK(f2->Size(&size));
ASSERT_EQ(kData2.size(), size);
}


TYPED_TEST(FileCacheTest, TestHeavyReads) {
const int kNumFiles = 20;
const int kNumIterations = 100;
Expand Down
Loading

0 comments on commit e77538b

Please sign in to comment.