Skip to content

Commit

Permalink
KUDU-1853 (redux). tablet copy: Don't orphan blocks on failure
Browse files Browse the repository at this point in the history
Previously, if a tablet copy failed we would orphan data blocks. This
patch makes it so that a failed tablet copy operation that does not
involve a process crash does not orphan data blocks.

This also refactors some deletion logic out of TSTabletManager so that
TabletCopyClient will tombstone partially-copied tablets when the copy
operation fails.

This version of the patch addresses and tests for a data loss issue
(KUDU-1968) that was merged along with a previous version of this patch,
and released with Kudu 1.3.0, before it was reverted in Kudu 1.3.1.

Additional changes in the revised patch:

* Don't check that block ids do not overlap between source and
  destination in tablet_copy_client-test since that's not guaranteed.
* Attempt to detect data loss in tablet_copy_client-test and fail the
  test if detected. In the case of the log block manager, this check is
  currently not reliable due to KUDU-1980.
* Rename old_superblock_ to remote_superblock_ in
  TabletCopyClientSession.

Originally committed as 72541b4 before
being reverted due to KUDU-1968.

Change-Id: Ic999ccd27859ace0d635255e86729dff8e1d8349
Reviewed-on: http://gerrit.cloudera.org:8080/6733
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <[email protected]>
  • Loading branch information
mpercy committed May 4, 2017
1 parent 89806c6 commit 5ae27d7
Show file tree
Hide file tree
Showing 8 changed files with 374 additions and 204 deletions.
4 changes: 2 additions & 2 deletions src/kudu/integration-tests/delete_table-itest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -652,8 +652,8 @@ TEST_F(DeleteTableITest, TestAutoTombstoneAfterTabletCopyRemoteFails) {
{
vector<ListTabletsResponsePB::StatusAndSchemaPB> status_pbs;
ASSERT_OK(WaitForNumTabletsOnTS(ts, 1, kTimeout, &status_pbs));
ASSERT_STR_CONTAINS(status_pbs[0].tablet_status().last_status(),
"Tombstoned tablet: Tablet Copy: Unable to fetch data from remote peer");
ASSERT_STR_MATCHES(status_pbs[0].tablet_status().last_status(),
"Tablet Copy: Tombstoned tablet .*: Tablet copy aborted");
}

// Now bring the other replicas back, re-elect the previous leader (TS-1),
Expand Down
34 changes: 29 additions & 5 deletions src/kudu/tserver/tablet_copy-test-base.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "kudu/tserver/tablet_server-test-base.h"

#include <string>
#include <vector>

#include "kudu/consensus/log_anchor_registry.h"
#include "kudu/consensus/opid_util.h"
Expand All @@ -34,8 +35,6 @@
namespace kudu {
namespace tserver {

using consensus::MinimumOpId;

// Number of times to roll the log.
static const int kNumLogRolls = 2;

Expand All @@ -49,7 +48,7 @@ class TabletCopyTest : public TabletServerTestBase {
// Flush(), Log GC is allowed to eat the logs before we get around to
// starting a tablet copy session.
tablet_peer_->log_anchor_registry()->Register(
MinimumOpId().index(), CURRENT_TEST_NAME(), &anchor_);
consensus::MinimumOpId().index(), CURRENT_TEST_NAME(), &anchor_);
NO_FATALS(GenerateTestData());
}

Expand All @@ -61,10 +60,35 @@ class TabletCopyTest : public TabletServerTestBase {
protected:
// Grab the first column block we find in the SuperBlock.
static BlockId FirstColumnBlockId(const tablet::TabletSuperBlockPB& superblock) {
DCHECK_GT(superblock.rowsets_size(), 0);
const tablet::RowSetDataPB& rowset = superblock.rowsets(0);
DCHECK_GT(rowset.columns_size(), 0);
const tablet::ColumnDataPB& column = rowset.columns(0);
const BlockIdPB& block_id_pb = column.block();
return BlockId::FromPB(block_id_pb);
return BlockId::FromPB(column.block());
}

// Return a vector of the blocks contained in the specified superblock (not
// including orphaned blocks).
static vector<BlockId> ListBlocks(const tablet::TabletSuperBlockPB& superblock) {
vector<BlockId> block_ids;
for (const auto& rowset : superblock.rowsets()) {
for (const auto& col : rowset.columns()) {
block_ids.emplace_back(col.block().id());
}
for (const auto& redos : rowset.redo_deltas()) {
block_ids.emplace_back(redos.block().id());
}
for (const auto& undos : rowset.undo_deltas()) {
block_ids.emplace_back(undos.block().id());
}
if (rowset.has_bloom_block()) {
block_ids.emplace_back(rowset.bloom_block().id());
}
if (rowset.has_adhoc_index_block()) {
block_ids.emplace_back(rowset.adhoc_index_block().id());
}
}
return block_ids;
}

// Check that the contents and CRC32C of a DataChunkPB are equal to a local buffer.
Expand Down
192 changes: 137 additions & 55 deletions src/kudu/tserver/tablet_copy_client-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
// under the License.
#include "kudu/tserver/tablet_copy-test-base.h"

#include <tuple>

#include <glog/stl_logging.h>

#include "kudu/consensus/quorum_util.h"
#include "kudu/gutil/strings/fastmem.h"
#include "kudu/tablet/tablet_bootstrap.h"
Expand All @@ -29,6 +33,7 @@ namespace tserver {

using consensus::GetRaftConfigLeader;
using consensus::RaftPeerPB;
using std::tuple;
using std::unique_ptr;
using tablet::TabletMetadata;

Expand All @@ -44,8 +49,8 @@ class TabletCopyClientTest : public TabletCopyTest {
tablet_peer_->WaitUntilConsensusRunning(MonoDelta::FromSeconds(10.0));
rpc::MessengerBuilder(CURRENT_TEST_NAME()).Build(&messenger_);
client_.reset(new TabletCopyClient(GetTabletId(),
fs_manager_.get(),
messenger_));
fs_manager_.get(),
messenger_));
ASSERT_OK(GetRaftConfigLeader(tablet_peer_->consensus()
->ConsensusState(consensus::CONSENSUS_CONFIG_COMMITTED), &leader_));

Expand Down Expand Up @@ -91,6 +96,12 @@ Status TabletCopyClientTest::CompareFileContents(const string& path1, const stri
return Status::OK();
}

// Implementation test that no blocks exist in the new superblock before fetching.
TEST_F(TabletCopyClientTest, TestNoBlocksAtStart) {
ASSERT_GT(ListBlocks(*client_->remote_superblock_).size(), 0);
ASSERT_EQ(0, ListBlocks(*client_->superblock_).size());
}

// Basic begin / end tablet copy session.
TEST_F(TabletCopyClientTest, TestBeginEndSession) {
ASSERT_OK(client_->FetchAll(nullptr /* no listener */));
Expand All @@ -99,7 +110,7 @@ TEST_F(TabletCopyClientTest, TestBeginEndSession) {

// Basic data block download unit test.
TEST_F(TabletCopyClientTest, TestDownloadBlock) {
BlockId block_id = FirstColumnBlockId(*client_->superblock_);
BlockId block_id = FirstColumnBlockId(*client_->remote_superblock_);
Slice slice;
faststring scratch;

Expand All @@ -113,8 +124,6 @@ TEST_F(TabletCopyClientTest, TestDownloadBlock) {
ASSERT_OK(client_->DownloadBlock(block_id, &new_block_id));

// Ensure it placed the block where we expected it to.
s = ReadLocalBlockFile(fs_manager_.get(), block_id, &scratch, &slice);
ASSERT_TRUE(s.IsNotFound()) << "Expected block not found: " << s.ToString();
ASSERT_OK(ReadLocalBlockFile(fs_manager_.get(), new_block_id, &scratch, &slice));
}

Expand Down Expand Up @@ -174,67 +183,140 @@ TEST_F(TabletCopyClientTest, TestVerifyData) {
LOG(INFO) << "Expected error returned: " << s.ToString();
}

namespace {

vector<BlockId> GetAllSortedBlocks(const tablet::TabletSuperBlockPB& sb) {
vector<BlockId> data_blocks;

for (const tablet::RowSetDataPB& rowset : sb.rowsets()) {
for (const tablet::DeltaDataPB& redo : rowset.redo_deltas()) {
data_blocks.push_back(BlockId::FromPB(redo.block()));
}
for (const tablet::DeltaDataPB& undo : rowset.undo_deltas()) {
data_blocks.push_back(BlockId::FromPB(undo.block()));
}
for (const tablet::ColumnDataPB& column : rowset.columns()) {
data_blocks.push_back(BlockId::FromPB(column.block()));
}
if (rowset.has_bloom_block()) {
data_blocks.push_back(BlockId::FromPB(rowset.bloom_block()));
}
if (rowset.has_adhoc_index_block()) {
data_blocks.push_back(BlockId::FromPB(rowset.adhoc_index_block()));
}
}

std::sort(data_blocks.begin(), data_blocks.end(), BlockIdCompare());
return data_blocks;
}

} // anonymous namespace

TEST_F(TabletCopyClientTest, TestDownloadAllBlocks) {
// Download all the blocks.
ASSERT_OK(client_->DownloadBlocks());

// Verify that the new superblock reflects the changes in block IDs.
//
// As long as block IDs are generated with UUIDs or something equally
// unique, there's no danger of a block in the new superblock somehow
// being assigned the same ID as a block in the existing superblock.
vector<BlockId> old_data_blocks = GetAllSortedBlocks(*client_->superblock_.get());
vector<BlockId> new_data_blocks = GetAllSortedBlocks(*client_->new_superblock_.get());
vector<BlockId> result;
std::set_intersection(old_data_blocks.begin(), old_data_blocks.end(),
new_data_blocks.begin(), new_data_blocks.end(),
std::back_inserter(result), BlockIdCompare());
ASSERT_TRUE(result.empty());
// After downloading blocks, verify that the old and remote and local
// superblock point to the same number of blocks.
vector<BlockId> old_data_blocks = ListBlocks(*client_->remote_superblock_);
vector<BlockId> new_data_blocks = ListBlocks(*client_->superblock_);
ASSERT_EQ(old_data_blocks.size(), new_data_blocks.size());

// Verify that the old blocks aren't found. We're using a different
// FsManager than 'tablet_peer', so the only way an old block could end
// up in ours is due to a tablet copy client bug.
for (const BlockId& block_id : old_data_blocks) {
unique_ptr<fs::ReadableBlock> block;
Status s = fs_manager_->OpenBlock(block_id, &block);
ASSERT_TRUE(s.IsNotFound()) << "Expected block not found: " << s.ToString();
}
// And the new blocks are all present.
// Verify that the new blocks are all present.
for (const BlockId& block_id : new_data_blocks) {
unique_ptr<fs::ReadableBlock> block;
ASSERT_OK(fs_manager_->OpenBlock(block_id, &block));
}
}

enum DownloadBlocks {
kDownloadBlocks, // Fetch blocks from remote.
kNoDownloadBlocks, // Do not fetch blocks from remote.
};
enum DeleteTrigger {
kAbortMethod, // Delete data via Abort().
kDestructor, // Delete data via destructor.
kNoDelete // Don't delete data.
};

struct AbortTestParams {
DownloadBlocks download_blocks;
DeleteTrigger delete_type;
};

class TabletCopyClientAbortTest : public TabletCopyClientTest,
public ::testing::WithParamInterface<
tuple<DownloadBlocks, DeleteTrigger>> {
protected:
// Create the specified number of blocks with junk data for testing purposes.
void CreateTestBlocks(int num_blocks);
};

INSTANTIATE_TEST_CASE_P(BlockDeleteTriggers,
TabletCopyClientAbortTest,
::testing::Combine(
::testing::Values(kDownloadBlocks, kNoDownloadBlocks),
::testing::Values(kAbortMethod, kDestructor, kNoDelete)));

void TabletCopyClientAbortTest::CreateTestBlocks(int num_blocks) {
for (int i = 0; i < num_blocks; i++) {
unique_ptr<fs::WritableBlock> block;
ASSERT_OK(fs_manager_->CreateNewBlock(&block));
block->Append("Test");
ASSERT_OK(block->Close());
}
}

// Test that we can clean up our downloaded blocks either explicitly using
// Abort() or implicitly by destroying the TabletCopyClient instance before
// calling Finish(). Also ensure that no data loss occurs.
TEST_P(TabletCopyClientAbortTest, TestAbort) {
tuple<DownloadBlocks, DeleteTrigger> param = GetParam();
DownloadBlocks download_blocks = std::get<0>(param);
DeleteTrigger trigger = std::get<1>(param);

// Check that there are remote blocks.
vector<BlockId> remote_block_ids = ListBlocks(*client_->remote_superblock_);
ASSERT_FALSE(remote_block_ids.empty());
int num_remote_blocks = client_->CountRemoteBlocks();
ASSERT_GT(num_remote_blocks, 0);
ASSERT_EQ(num_remote_blocks, remote_block_ids.size());

// Create some local blocks so we can check that we didn't lose any existing
// data on abort. TODO(mpercy): The data loss check here will likely never
// trigger until we fix KUDU-1980 because there is a workaround / hack in the
// LBM that randomizes the starting block id for each BlockManager instance.
// Therefore the block ids will never overlap.
const int kNumBlocksToCreate = 100;
NO_FATALS(CreateTestBlocks(kNumBlocksToCreate));

vector<BlockId> local_block_ids;
ASSERT_OK(fs_manager_->block_manager()->GetAllBlockIds(&local_block_ids));
ASSERT_EQ(kNumBlocksToCreate, local_block_ids.size());
VLOG(1) << "Local blocks: " << local_block_ids;

int num_blocks_downloaded = 0;
if (download_blocks == kDownloadBlocks) {
ASSERT_OK(client_->DownloadBlocks());
num_blocks_downloaded = num_remote_blocks;
}

vector<BlockId> new_local_block_ids;
ASSERT_OK(fs_manager_->block_manager()->GetAllBlockIds(&new_local_block_ids));
ASSERT_EQ(kNumBlocksToCreate + num_blocks_downloaded, new_local_block_ids.size());

// Download a WAL segment.
ASSERT_OK(fs_manager_->CreateDirIfMissing(fs_manager_->GetTabletWalDir(GetTabletId())));
uint64_t seqno = client_->wal_seqnos_[0];
ASSERT_OK(client_->DownloadWAL(seqno));
string wal_path = fs_manager_->GetWalSegmentFileName(GetTabletId(), seqno);
ASSERT_TRUE(fs_manager_->Exists(wal_path));

scoped_refptr<TabletMetadata> meta = client_->meta_;

switch (trigger) {
case kAbortMethod:
ASSERT_OK(client_->Abort());
break;
case kDestructor:
client_.reset();
break;
case kNoDelete:
// Call Finish() and then destroy the object.
// It should not delete its downloaded blocks.
ASSERT_OK(client_->Finish());
client_.reset();
break;
default:
FAIL();
}

if (trigger == kNoDelete) {
vector<BlockId> new_local_block_ids;
ASSERT_OK(fs_manager_->block_manager()->GetAllBlockIds(&new_local_block_ids));
ASSERT_EQ(kNumBlocksToCreate + num_blocks_downloaded, new_local_block_ids.size());
} else {
ASSERT_EQ(tablet::TABLET_DATA_TOMBSTONED, meta->tablet_data_state());
ASSERT_FALSE(fs_manager_->Exists(wal_path));
vector<BlockId> latest_blocks;
fs_manager_->block_manager()->GetAllBlockIds(&latest_blocks);
ASSERT_EQ(local_block_ids.size(), latest_blocks.size());
}
for (const auto& block_id : local_block_ids) {
ASSERT_TRUE(fs_manager_->BlockExists(block_id)) << "Missing block: " << block_id;
}
}

} // namespace tserver
} // namespace kudu
Loading

0 comments on commit 5ae27d7

Please sign in to comment.