diff --git a/src/kudu/integration-tests/delete_table-itest.cc b/src/kudu/integration-tests/delete_table-itest.cc index 73f58fffed..e0b0d95b7f 100644 --- a/src/kudu/integration-tests/delete_table-itest.cc +++ b/src/kudu/integration-tests/delete_table-itest.cc @@ -652,8 +652,8 @@ TEST_F(DeleteTableITest, TestAutoTombstoneAfterTabletCopyRemoteFails) { { vector status_pbs; ASSERT_OK(WaitForNumTabletsOnTS(ts, 1, kTimeout, &status_pbs)); - ASSERT_STR_CONTAINS(status_pbs[0].tablet_status().last_status(), - "Tombstoned tablet: Tablet Copy: Unable to fetch data from remote peer"); + ASSERT_STR_MATCHES(status_pbs[0].tablet_status().last_status(), + "Tablet Copy: Tombstoned tablet .*: Tablet copy aborted"); } // Now bring the other replicas back, re-elect the previous leader (TS-1), diff --git a/src/kudu/tserver/tablet_copy-test-base.h b/src/kudu/tserver/tablet_copy-test-base.h index 2d5fc9aea9..bb9bc458e6 100644 --- a/src/kudu/tserver/tablet_copy-test-base.h +++ b/src/kudu/tserver/tablet_copy-test-base.h @@ -20,6 +20,7 @@ #include "kudu/tserver/tablet_server-test-base.h" #include +#include #include "kudu/consensus/log_anchor_registry.h" #include "kudu/consensus/opid_util.h" @@ -34,8 +35,6 @@ namespace kudu { namespace tserver { -using consensus::MinimumOpId; - // Number of times to roll the log. static const int kNumLogRolls = 2; @@ -49,7 +48,7 @@ class TabletCopyTest : public TabletServerTestBase { // Flush(), Log GC is allowed to eat the logs before we get around to // starting a tablet copy session. tablet_peer_->log_anchor_registry()->Register( - MinimumOpId().index(), CURRENT_TEST_NAME(), &anchor_); + consensus::MinimumOpId().index(), CURRENT_TEST_NAME(), &anchor_); NO_FATALS(GenerateTestData()); } @@ -61,10 +60,35 @@ class TabletCopyTest : public TabletServerTestBase { protected: // Grab the first column block we find in the SuperBlock. static BlockId FirstColumnBlockId(const tablet::TabletSuperBlockPB& superblock) { + DCHECK_GT(superblock.rowsets_size(), 0); const tablet::RowSetDataPB& rowset = superblock.rowsets(0); + DCHECK_GT(rowset.columns_size(), 0); const tablet::ColumnDataPB& column = rowset.columns(0); - const BlockIdPB& block_id_pb = column.block(); - return BlockId::FromPB(block_id_pb); + return BlockId::FromPB(column.block()); + } + + // Return a vector of the blocks contained in the specified superblock (not + // including orphaned blocks). + static vector ListBlocks(const tablet::TabletSuperBlockPB& superblock) { + vector block_ids; + for (const auto& rowset : superblock.rowsets()) { + for (const auto& col : rowset.columns()) { + block_ids.emplace_back(col.block().id()); + } + for (const auto& redos : rowset.redo_deltas()) { + block_ids.emplace_back(redos.block().id()); + } + for (const auto& undos : rowset.undo_deltas()) { + block_ids.emplace_back(undos.block().id()); + } + if (rowset.has_bloom_block()) { + block_ids.emplace_back(rowset.bloom_block().id()); + } + if (rowset.has_adhoc_index_block()) { + block_ids.emplace_back(rowset.adhoc_index_block().id()); + } + } + return block_ids; } // Check that the contents and CRC32C of a DataChunkPB are equal to a local buffer. diff --git a/src/kudu/tserver/tablet_copy_client-test.cc b/src/kudu/tserver/tablet_copy_client-test.cc index b775c583cb..cf6cdd5ac4 100644 --- a/src/kudu/tserver/tablet_copy_client-test.cc +++ b/src/kudu/tserver/tablet_copy_client-test.cc @@ -16,6 +16,10 @@ // under the License. #include "kudu/tserver/tablet_copy-test-base.h" +#include + +#include + #include "kudu/consensus/quorum_util.h" #include "kudu/gutil/strings/fastmem.h" #include "kudu/tablet/tablet_bootstrap.h" @@ -29,6 +33,7 @@ namespace tserver { using consensus::GetRaftConfigLeader; using consensus::RaftPeerPB; +using std::tuple; using std::unique_ptr; using tablet::TabletMetadata; @@ -44,8 +49,8 @@ class TabletCopyClientTest : public TabletCopyTest { tablet_peer_->WaitUntilConsensusRunning(MonoDelta::FromSeconds(10.0)); rpc::MessengerBuilder(CURRENT_TEST_NAME()).Build(&messenger_); client_.reset(new TabletCopyClient(GetTabletId(), - fs_manager_.get(), - messenger_)); + fs_manager_.get(), + messenger_)); ASSERT_OK(GetRaftConfigLeader(tablet_peer_->consensus() ->ConsensusState(consensus::CONSENSUS_CONFIG_COMMITTED), &leader_)); @@ -91,6 +96,12 @@ Status TabletCopyClientTest::CompareFileContents(const string& path1, const stri return Status::OK(); } +// Implementation test that no blocks exist in the new superblock before fetching. +TEST_F(TabletCopyClientTest, TestNoBlocksAtStart) { + ASSERT_GT(ListBlocks(*client_->remote_superblock_).size(), 0); + ASSERT_EQ(0, ListBlocks(*client_->superblock_).size()); +} + // Basic begin / end tablet copy session. TEST_F(TabletCopyClientTest, TestBeginEndSession) { ASSERT_OK(client_->FetchAll(nullptr /* no listener */)); @@ -99,7 +110,7 @@ TEST_F(TabletCopyClientTest, TestBeginEndSession) { // Basic data block download unit test. TEST_F(TabletCopyClientTest, TestDownloadBlock) { - BlockId block_id = FirstColumnBlockId(*client_->superblock_); + BlockId block_id = FirstColumnBlockId(*client_->remote_superblock_); Slice slice; faststring scratch; @@ -113,8 +124,6 @@ TEST_F(TabletCopyClientTest, TestDownloadBlock) { ASSERT_OK(client_->DownloadBlock(block_id, &new_block_id)); // Ensure it placed the block where we expected it to. - s = ReadLocalBlockFile(fs_manager_.get(), block_id, &scratch, &slice); - ASSERT_TRUE(s.IsNotFound()) << "Expected block not found: " << s.ToString(); ASSERT_OK(ReadLocalBlockFile(fs_manager_.get(), new_block_id, &scratch, &slice)); } @@ -174,67 +183,140 @@ TEST_F(TabletCopyClientTest, TestVerifyData) { LOG(INFO) << "Expected error returned: " << s.ToString(); } -namespace { - -vector GetAllSortedBlocks(const tablet::TabletSuperBlockPB& sb) { - vector data_blocks; - - for (const tablet::RowSetDataPB& rowset : sb.rowsets()) { - for (const tablet::DeltaDataPB& redo : rowset.redo_deltas()) { - data_blocks.push_back(BlockId::FromPB(redo.block())); - } - for (const tablet::DeltaDataPB& undo : rowset.undo_deltas()) { - data_blocks.push_back(BlockId::FromPB(undo.block())); - } - for (const tablet::ColumnDataPB& column : rowset.columns()) { - data_blocks.push_back(BlockId::FromPB(column.block())); - } - if (rowset.has_bloom_block()) { - data_blocks.push_back(BlockId::FromPB(rowset.bloom_block())); - } - if (rowset.has_adhoc_index_block()) { - data_blocks.push_back(BlockId::FromPB(rowset.adhoc_index_block())); - } - } - - std::sort(data_blocks.begin(), data_blocks.end(), BlockIdCompare()); - return data_blocks; -} - -} // anonymous namespace - TEST_F(TabletCopyClientTest, TestDownloadAllBlocks) { // Download all the blocks. ASSERT_OK(client_->DownloadBlocks()); - // Verify that the new superblock reflects the changes in block IDs. - // - // As long as block IDs are generated with UUIDs or something equally - // unique, there's no danger of a block in the new superblock somehow - // being assigned the same ID as a block in the existing superblock. - vector old_data_blocks = GetAllSortedBlocks(*client_->superblock_.get()); - vector new_data_blocks = GetAllSortedBlocks(*client_->new_superblock_.get()); - vector result; - std::set_intersection(old_data_blocks.begin(), old_data_blocks.end(), - new_data_blocks.begin(), new_data_blocks.end(), - std::back_inserter(result), BlockIdCompare()); - ASSERT_TRUE(result.empty()); + // After downloading blocks, verify that the old and remote and local + // superblock point to the same number of blocks. + vector old_data_blocks = ListBlocks(*client_->remote_superblock_); + vector new_data_blocks = ListBlocks(*client_->superblock_); ASSERT_EQ(old_data_blocks.size(), new_data_blocks.size()); - // Verify that the old blocks aren't found. We're using a different - // FsManager than 'tablet_peer', so the only way an old block could end - // up in ours is due to a tablet copy client bug. - for (const BlockId& block_id : old_data_blocks) { - unique_ptr block; - Status s = fs_manager_->OpenBlock(block_id, &block); - ASSERT_TRUE(s.IsNotFound()) << "Expected block not found: " << s.ToString(); - } - // And the new blocks are all present. + // Verify that the new blocks are all present. for (const BlockId& block_id : new_data_blocks) { unique_ptr block; ASSERT_OK(fs_manager_->OpenBlock(block_id, &block)); } } +enum DownloadBlocks { + kDownloadBlocks, // Fetch blocks from remote. + kNoDownloadBlocks, // Do not fetch blocks from remote. +}; +enum DeleteTrigger { + kAbortMethod, // Delete data via Abort(). + kDestructor, // Delete data via destructor. + kNoDelete // Don't delete data. +}; + +struct AbortTestParams { + DownloadBlocks download_blocks; + DeleteTrigger delete_type; +}; + +class TabletCopyClientAbortTest : public TabletCopyClientTest, + public ::testing::WithParamInterface< + tuple> { + protected: + // Create the specified number of blocks with junk data for testing purposes. + void CreateTestBlocks(int num_blocks); +}; + +INSTANTIATE_TEST_CASE_P(BlockDeleteTriggers, + TabletCopyClientAbortTest, + ::testing::Combine( + ::testing::Values(kDownloadBlocks, kNoDownloadBlocks), + ::testing::Values(kAbortMethod, kDestructor, kNoDelete))); + +void TabletCopyClientAbortTest::CreateTestBlocks(int num_blocks) { + for (int i = 0; i < num_blocks; i++) { + unique_ptr block; + ASSERT_OK(fs_manager_->CreateNewBlock(&block)); + block->Append("Test"); + ASSERT_OK(block->Close()); + } +} + +// Test that we can clean up our downloaded blocks either explicitly using +// Abort() or implicitly by destroying the TabletCopyClient instance before +// calling Finish(). Also ensure that no data loss occurs. +TEST_P(TabletCopyClientAbortTest, TestAbort) { + tuple param = GetParam(); + DownloadBlocks download_blocks = std::get<0>(param); + DeleteTrigger trigger = std::get<1>(param); + + // Check that there are remote blocks. + vector remote_block_ids = ListBlocks(*client_->remote_superblock_); + ASSERT_FALSE(remote_block_ids.empty()); + int num_remote_blocks = client_->CountRemoteBlocks(); + ASSERT_GT(num_remote_blocks, 0); + ASSERT_EQ(num_remote_blocks, remote_block_ids.size()); + + // Create some local blocks so we can check that we didn't lose any existing + // data on abort. TODO(mpercy): The data loss check here will likely never + // trigger until we fix KUDU-1980 because there is a workaround / hack in the + // LBM that randomizes the starting block id for each BlockManager instance. + // Therefore the block ids will never overlap. + const int kNumBlocksToCreate = 100; + NO_FATALS(CreateTestBlocks(kNumBlocksToCreate)); + + vector local_block_ids; + ASSERT_OK(fs_manager_->block_manager()->GetAllBlockIds(&local_block_ids)); + ASSERT_EQ(kNumBlocksToCreate, local_block_ids.size()); + VLOG(1) << "Local blocks: " << local_block_ids; + + int num_blocks_downloaded = 0; + if (download_blocks == kDownloadBlocks) { + ASSERT_OK(client_->DownloadBlocks()); + num_blocks_downloaded = num_remote_blocks; + } + + vector new_local_block_ids; + ASSERT_OK(fs_manager_->block_manager()->GetAllBlockIds(&new_local_block_ids)); + ASSERT_EQ(kNumBlocksToCreate + num_blocks_downloaded, new_local_block_ids.size()); + + // Download a WAL segment. + ASSERT_OK(fs_manager_->CreateDirIfMissing(fs_manager_->GetTabletWalDir(GetTabletId()))); + uint64_t seqno = client_->wal_seqnos_[0]; + ASSERT_OK(client_->DownloadWAL(seqno)); + string wal_path = fs_manager_->GetWalSegmentFileName(GetTabletId(), seqno); + ASSERT_TRUE(fs_manager_->Exists(wal_path)); + + scoped_refptr meta = client_->meta_; + + switch (trigger) { + case kAbortMethod: + ASSERT_OK(client_->Abort()); + break; + case kDestructor: + client_.reset(); + break; + case kNoDelete: + // Call Finish() and then destroy the object. + // It should not delete its downloaded blocks. + ASSERT_OK(client_->Finish()); + client_.reset(); + break; + default: + FAIL(); + } + + if (trigger == kNoDelete) { + vector new_local_block_ids; + ASSERT_OK(fs_manager_->block_manager()->GetAllBlockIds(&new_local_block_ids)); + ASSERT_EQ(kNumBlocksToCreate + num_blocks_downloaded, new_local_block_ids.size()); + } else { + ASSERT_EQ(tablet::TABLET_DATA_TOMBSTONED, meta->tablet_data_state()); + ASSERT_FALSE(fs_manager_->Exists(wal_path)); + vector latest_blocks; + fs_manager_->block_manager()->GetAllBlockIds(&latest_blocks); + ASSERT_EQ(local_block_ids.size(), latest_blocks.size()); + } + for (const auto& block_id : local_block_ids) { + ASSERT_TRUE(fs_manager_->BlockExists(block_id)) << "Missing block: " << block_id; + } +} + } // namespace tserver } // namespace kudu diff --git a/src/kudu/tserver/tablet_copy_client.cc b/src/kudu/tserver/tablet_copy_client.cc index 856ee755b8..495074fe20 100644 --- a/src/kudu/tserver/tablet_copy_client.cc +++ b/src/kudu/tserver/tablet_copy_client.cc @@ -38,6 +38,7 @@ #include "kudu/tserver/tablet_copy.pb.h" #include "kudu/tserver/tablet_copy.proxy.h" #include "kudu/tserver/tablet_server.h" +#include "kudu/tserver/ts_tablet_manager.h" #include "kudu/util/crc.h" #include "kudu/util/env.h" #include "kudu/util/env_util.h" @@ -95,14 +96,12 @@ using tablet::TabletStatusListener; using tablet::TabletSuperBlockPB; TabletCopyClient::TabletCopyClient(std::string tablet_id, - FsManager* fs_manager, - shared_ptr messenger) + FsManager* fs_manager, + shared_ptr messenger) : tablet_id_(std::move(tablet_id)), fs_manager_(fs_manager), messenger_(std::move(messenger)), - started_(false), - downloaded_wal_(false), - downloaded_blocks_(false), + state_(kInitialized), replace_tombstoned_tablet_(false), status_listener_(nullptr), session_idle_timeout_millis_(0), @@ -110,11 +109,14 @@ TabletCopyClient::TabletCopyClient(std::string tablet_id, TabletCopyClient::~TabletCopyClient() { // Note: Ending the tablet copy session releases anchors on the remote. - WARN_NOT_OK(EndRemoteSession(), "Unable to close tablet copy session"); + WARN_NOT_OK(EndRemoteSession(), Substitute("$0Unable to close tablet copy session", + LogPrefix())); + WARN_NOT_OK(Abort(), Substitute("$0Failed to fully clean up tablet after aborted copy", + LogPrefix())); } Status TabletCopyClient::SetTabletToReplace(const scoped_refptr& meta, - int64_t caller_term) { + int64_t caller_term) { CHECK_EQ(tablet_id_, meta->tablet_id()); TabletDataState data_state = meta->tablet_data_state(); if (data_state != tablet::TABLET_DATA_TOMBSTONED) { @@ -150,8 +152,8 @@ Status TabletCopyClient::SetTabletToReplace(const scoped_refptr& } Status TabletCopyClient::Start(const HostPort& copy_source_addr, - scoped_refptr* meta) { - CHECK(!started_); + scoped_refptr* meta) { + CHECK_EQ(kInitialized, state_); start_time_micros_ = GetCurrentTimeMicros(); Sockaddr addr; @@ -192,8 +194,24 @@ Status TabletCopyClient::Start(const HostPort& copy_source_addr, session_id_ = resp.session_id(); session_idle_timeout_millis_ = resp.session_idle_timeout_millis(); - superblock_.reset(resp.release_superblock()); + + // Store a copy of the remote (old) superblock. + remote_superblock_.reset(resp.release_superblock()); + + // Make a copy of the remote superblock. We first clear out the remote blocks + // from this structure and then add them back in as they are downloaded. + superblock_.reset(new TabletSuperBlockPB(*remote_superblock_)); + + // The block ids (in active rowsets as well as from orphaned blocks) on the + // remote have no meaning to us and could cause data loss if accidentally + // deleted locally. We must clear them all. + superblock_->clear_rowsets(); + superblock_->clear_orphaned_blocks(); + + // Set the data state to COPYING to indicate that, on crash, this replica + // should be discarded. superblock_->set_tablet_data_state(tablet::TABLET_DATA_COPYING); + wal_seqnos_.assign(resp.wal_segment_seqnos().begin(), resp.wal_segment_seqnos().end()); remote_committed_cstate_.reset(resp.release_initial_committed_cstate()); @@ -212,19 +230,19 @@ Status TabletCopyClient::Start(const HostPort& copy_source_addr, return Status::InvalidArgument( Substitute("Tablet $0: source peer has term $1 but " "tombstoned replica has last-logged opid with higher term $2. " - "Refusing tablet copy from source peer $3", - tablet_id_, - remote_committed_cstate_->current_term(), - last_logged_term, - copy_peer_uuid)); + "Refusing tablet copy from source peer $3", + tablet_id_, + remote_committed_cstate_->current_term(), + last_logged_term, + copy_peer_uuid)); } - // Remove any existing orphaned blocks from the tablet, and + // Remove any existing orphaned blocks and WALs from the tablet, and // set the data state to 'COPYING'. - RETURN_NOT_OK_PREPEND(meta_->DeleteTabletData(tablet::TABLET_DATA_COPYING, boost::none), - "Couldn't replace superblock with COPYING data state"); + RETURN_NOT_OK_PREPEND( + TSTabletManager::DeleteTabletData(meta_, tablet::TABLET_DATA_COPYING, boost::none), + "Could not replace superblock with COPYING data state"); } else { - Partition partition; Partition::FromPB(superblock_->partition(), &partition); PartitionSchema partition_schema; @@ -242,7 +260,7 @@ Status TabletCopyClient::Start(const HostPort& copy_source_addr, &meta_)); } - started_ = true; + state_ = kStarted; if (meta) { *meta = meta_; } @@ -250,7 +268,8 @@ Status TabletCopyClient::Start(const HostPort& copy_source_addr, } Status TabletCopyClient::FetchAll(TabletStatusListener* status_listener) { - CHECK(started_); + CHECK_EQ(kStarted, state_); + status_listener_ = status_listener; // Download all the files (serially, for now, but in parallel in the future). @@ -262,9 +281,8 @@ Status TabletCopyClient::FetchAll(TabletStatusListener* status_listener) { Status TabletCopyClient::Finish() { CHECK(meta_); - CHECK(started_); - CHECK(downloaded_wal_); - CHECK(downloaded_blocks_); + CHECK_EQ(kStarted, state_); + state_ = kFinished; RETURN_NOT_OK(WriteConsensusMetadata()); @@ -273,8 +291,8 @@ Status TabletCopyClient::Finish() { // superblock is in a valid state to bootstrap from. LOG_WITH_PREFIX(INFO) << "Tablet Copy complete. Replacing tablet superblock."; UpdateStatusMessage("Replacing tablet superblock"); - new_superblock_->set_tablet_data_state(tablet::TABLET_DATA_READY); - RETURN_NOT_OK(meta_->ReplaceSuperBlock(*new_superblock_)); + superblock_->set_tablet_data_state(tablet::TABLET_DATA_READY); + RETURN_NOT_OK(meta_->ReplaceSuperBlock(*superblock_)); if (FLAGS_tablet_copy_save_downloaded_metadata) { string meta_path = fs_manager_->GetTabletMetadataPath(tablet_id_); @@ -287,6 +305,27 @@ Status TabletCopyClient::Finish() { return Status::OK(); } +Status TabletCopyClient::Abort() { + if (state_ != kStarted) { + return Status::OK(); + } + state_ = kFinished; + CHECK(meta_); + + // Write the in-progress superblock to disk so that when we delete the tablet + // data all the partial blocks we have persisted will be deleted. + DCHECK_EQ(tablet::TABLET_DATA_COPYING, superblock_->tablet_data_state()); + RETURN_NOT_OK(meta_->ReplaceSuperBlock(*superblock_)); + + // Delete all of the tablet data, including blocks and WALs. + RETURN_NOT_OK_PREPEND( + TSTabletManager::DeleteTabletData(meta_, tablet::TABLET_DATA_TOMBSTONED, boost::none), + LogPrefix() + "Failed to tombstone tablet after aborting tablet copy"); + + UpdateStatusMessage(Substitute("Tombstoned tablet $0: Tablet copy aborted", tablet_id_)); + return Status::OK(); +} + // Decode the remote error into a human-readable Status object. Status TabletCopyClient::ExtractRemoteError(const rpc::ErrorStatusPB& remote_error) { if (PREDICT_TRUE(remote_error.HasExtension(TabletCopyErrorPB::tablet_copy_error_ext))) { @@ -302,7 +341,7 @@ Status TabletCopyClient::ExtractRemoteError(const rpc::ErrorStatusPB& remote_err // Enhance a RemoteError Status message with additional details from the remote. Status TabletCopyClient::UnwindRemoteError(const Status& status, - const rpc::RpcController& controller) { + const rpc::RpcController& controller) { if (!status.IsRemoteError()) { return status; } @@ -312,18 +351,17 @@ Status TabletCopyClient::UnwindRemoteError(const Status& status, void TabletCopyClient::UpdateStatusMessage(const string& message) { if (status_listener_ != nullptr) { - status_listener_->StatusMessage("TabletCopy: " + message); + status_listener_->StatusMessage(Substitute("Tablet Copy: $0", message)); } } Status TabletCopyClient::EndRemoteSession() { - if (!started_) { + if (state_ == kInitialized) { return Status::OK(); } rpc::RpcController controller; - controller.set_timeout(MonoDelta::FromMilliseconds( - FLAGS_tablet_copy_begin_session_timeout_ms)); + controller.set_timeout(MonoDelta::FromMilliseconds(FLAGS_tablet_copy_begin_session_timeout_ms)); EndTabletCopySessionRequestPB req; req.set_session_id(session_id_); @@ -337,7 +375,7 @@ Status TabletCopyClient::EndRemoteSession() { } Status TabletCopyClient::DownloadWALs() { - CHECK(started_); + CHECK_EQ(kStarted, state_); // Delete and recreate WAL dir if it already exists, to ensure stray files are // not kept from previous copies and runs. @@ -359,16 +397,12 @@ Status TabletCopyClient::DownloadWALs() { ++counter; } - downloaded_wal_ = true; return Status::OK(); } -Status TabletCopyClient::DownloadBlocks() { - CHECK(started_); - - // Count up the total number of blocks to download. +int TabletCopyClient::CountRemoteBlocks() const { int num_blocks = 0; - for (const RowSetDataPB& rowset : superblock_->rowsets()) { + for (const RowSetDataPB& rowset : remote_superblock_->rowsets()) { num_blocks += rowset.columns_size(); num_blocks += rowset.redo_deltas_size(); num_blocks += rowset.undo_deltas_size(); @@ -379,42 +413,74 @@ Status TabletCopyClient::DownloadBlocks() { num_blocks++; } } + return num_blocks; +} + +Status TabletCopyClient::DownloadBlocks() { + CHECK_EQ(kStarted, state_); + + // Count up the total number of blocks to download. + int num_remote_blocks = CountRemoteBlocks(); // Download each block, writing the new block IDs into the new superblock // as each block downloads. - gscoped_ptr new_sb(new TabletSuperBlockPB()); - new_sb->CopyFrom(*superblock_); int block_count = 0; - LOG_WITH_PREFIX(INFO) << "Starting download of " << num_blocks << " data blocks..."; - for (RowSetDataPB& rowset : *new_sb->mutable_rowsets()) { - for (ColumnDataPB& col : *rowset.mutable_columns()) { - RETURN_NOT_OK(DownloadAndRewriteBlock(col.mutable_block(), - &block_count, num_blocks)); + LOG_WITH_PREFIX(INFO) << "Starting download of " << num_remote_blocks << " data blocks..."; + for (const RowSetDataPB& src_rowset : remote_superblock_->rowsets()) { + // Create rowset. + RowSetDataPB* dst_rowset = superblock_->add_rowsets(); + *dst_rowset = src_rowset; + // Clear the data in the rowset so that we don't end up deleting the wrong + // blocks (using the ids of the remote blocks) if we fail. + // TODO(mpercy): This is pretty fragile. Consider building a class + // structure on top of SuperBlockPB to abstract copying details. + dst_rowset->clear_columns(); + dst_rowset->clear_redo_deltas(); + dst_rowset->clear_undo_deltas(); + dst_rowset->clear_bloom_block(); + dst_rowset->clear_adhoc_index_block(); + + // We can't leave superblock_ unserializable with unset required field + // values in child elements, so we must download and rewrite each block + // before referencing it in the rowset. + for (const ColumnDataPB& src_col : src_rowset.columns()) { + BlockIdPB new_block_id; + RETURN_NOT_OK(DownloadAndRewriteBlock(src_col.block(), num_remote_blocks, + &block_count, &new_block_id)); + ColumnDataPB* dst_col = dst_rowset->add_columns(); + *dst_col = src_col; + *dst_col->mutable_block() = new_block_id; } - for (DeltaDataPB& redo : *rowset.mutable_redo_deltas()) { - RETURN_NOT_OK(DownloadAndRewriteBlock(redo.mutable_block(), - &block_count, num_blocks)); + for (const DeltaDataPB& src_redo : src_rowset.redo_deltas()) { + BlockIdPB new_block_id; + RETURN_NOT_OK(DownloadAndRewriteBlock(src_redo.block(), num_remote_blocks, + &block_count, &new_block_id)); + DeltaDataPB* dst_redo = dst_rowset->add_redo_deltas(); + *dst_redo = src_redo; + *dst_redo->mutable_block() = new_block_id; } - for (DeltaDataPB& undo : *rowset.mutable_undo_deltas()) { - RETURN_NOT_OK(DownloadAndRewriteBlock(undo.mutable_block(), - &block_count, num_blocks)); + for (const DeltaDataPB& src_undo : src_rowset.undo_deltas()) { + BlockIdPB new_block_id; + RETURN_NOT_OK(DownloadAndRewriteBlock(src_undo.block(), num_remote_blocks, + &block_count, &new_block_id)); + DeltaDataPB* dst_undo = dst_rowset->add_undo_deltas(); + *dst_undo = src_undo; + *dst_undo->mutable_block() = new_block_id; } - if (rowset.has_bloom_block()) { - RETURN_NOT_OK(DownloadAndRewriteBlock(rowset.mutable_bloom_block(), - &block_count, num_blocks)); + if (src_rowset.has_bloom_block()) { + BlockIdPB new_block_id; + RETURN_NOT_OK(DownloadAndRewriteBlock(src_rowset.bloom_block(), num_remote_blocks, + &block_count, &new_block_id)); + *dst_rowset->mutable_bloom_block() = new_block_id; } - if (rowset.has_adhoc_index_block()) { - RETURN_NOT_OK(DownloadAndRewriteBlock(rowset.mutable_adhoc_index_block(), - &block_count, num_blocks)); + if (src_rowset.has_adhoc_index_block()) { + BlockIdPB new_block_id; + RETURN_NOT_OK(DownloadAndRewriteBlock(src_rowset.adhoc_index_block(), num_remote_blocks, + &block_count, &new_block_id)); + *dst_rowset->mutable_adhoc_index_block() = new_block_id; } } - // The orphaned physical block ids at the remote have no meaning to us. - new_sb->clear_orphaned_blocks(); - new_superblock_.swap(new_sb); - - downloaded_blocks_ = true; - return Status::OK(); } @@ -462,23 +528,25 @@ Status TabletCopyClient::WriteConsensusMetadata() { return Status::OK(); } -Status TabletCopyClient::DownloadAndRewriteBlock(BlockIdPB* block_id, - int* block_count, int num_blocks) { - BlockId old_block_id(BlockId::FromPB(*block_id)); +Status TabletCopyClient::DownloadAndRewriteBlock(const BlockIdPB& src_block_id, + int num_blocks, + int* block_count, + BlockIdPB* dest_block_id) { + BlockId old_block_id(BlockId::FromPB(src_block_id)); UpdateStatusMessage(Substitute("Downloading block $0 ($1/$2)", - old_block_id.ToString(), *block_count, - num_blocks)); + old_block_id.ToString(), + *block_count + 1, num_blocks)); BlockId new_block_id; RETURN_NOT_OK_PREPEND(DownloadBlock(old_block_id, &new_block_id), "Unable to download block with id " + old_block_id.ToString()); - new_block_id.CopyToPB(block_id); + new_block_id.CopyToPB(dest_block_id); (*block_count)++; return Status::OK(); } Status TabletCopyClient::DownloadBlock(const BlockId& old_block_id, - BlockId* new_block_id) { + BlockId* new_block_id) { VLOG_WITH_PREFIX(1) << "Downloading block with block_id " << old_block_id.ToString(); unique_ptr block; @@ -499,7 +567,7 @@ Status TabletCopyClient::DownloadBlock(const BlockId& old_block_id, template Status TabletCopyClient::DownloadFile(const DataIdPB& data_id, - Appendable* appendable) { + Appendable* appendable) { uint64_t offset = 0; rpc::RpcController controller; controller.set_timeout(MonoDelta::FromMilliseconds(session_idle_timeout_millis_)); diff --git a/src/kudu/tserver/tablet_copy_client.h b/src/kudu/tserver/tablet_copy_client.h index de2a730d17..3545915f2d 100644 --- a/src/kudu/tserver/tablet_copy_client.h +++ b/src/kudu/tserver/tablet_copy_client.h @@ -23,14 +23,13 @@ #include -#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/fs/block_id.h" #include "kudu/gutil/macros.h" #include "kudu/gutil/ref_counted.h" #include "kudu/util/status.h" namespace kudu { -class BlockId; class BlockIdPB; class FsManager; class HostPort; @@ -95,22 +94,35 @@ class TabletCopyClient { // otherwise the TabletMetadata object resulting from the initial remote // bootstrap response is returned. Status Start(const HostPort& copy_source_addr, - scoped_refptr* metadata); + scoped_refptr* meta); // Runs a "full" tablet copy, copying the physical layout of a tablet // from the leader of the specified consensus configuration. Status FetchAll(tablet::TabletStatusListener* status_listener); // After downloading all files successfully, write out the completed - // replacement superblock. + // replacement superblock. Must be called after Start() and FetchAll(). + // Must not be called after Abort(). Status Finish(); + // Abort an in-progress transfer and immediately delete the data blocks and + // WALs downloaded so far. Does nothing if called after Finish(). + Status Abort(); + private: + FRIEND_TEST(TabletCopyClientTest, TestNoBlocksAtStart); FRIEND_TEST(TabletCopyClientTest, TestBeginEndSession); FRIEND_TEST(TabletCopyClientTest, TestDownloadBlock); FRIEND_TEST(TabletCopyClientTest, TestVerifyData); FRIEND_TEST(TabletCopyClientTest, TestDownloadWalSegment); FRIEND_TEST(TabletCopyClientTest, TestDownloadAllBlocks); + FRIEND_TEST(TabletCopyClientAbortTest, TestAbort); + + enum State { + kInitialized, + kStarted, + kFinished, + }; // Extract the embedded Status message from the given ErrorStatusPB. // The given ErrorStatusPB must extend TabletCopyErrorPB. @@ -137,19 +149,26 @@ class TabletCopyClient { // downloaded as part of initiating the tablet copy session. Status WriteConsensusMetadata(); + // Count the number of blocks on the remote (from 'remote_superblock_'). + int CountRemoteBlocks() const; + // Download all blocks belonging to a tablet sequentially. // - // Blocks are given new IDs upon creation. On success, 'new_superblock_' - // is populated to reflect the new block IDs and should be used in lieu - // of 'superblock_' henceforth. + // Blocks are given new IDs upon creation. On success, 'superblock_' + // is populated to reflect the new block IDs. Status DownloadBlocks(); - // Download the block specified by 'block_id'. + // Download the remote block specified by 'src_block_id'. 'num_blocks' should + // be given as the total number of blocks there are to download (for logging + // purposes). // // On success: - // - 'block_id' is set to the new ID of the downloaded block. - // - 'block_count' is incremented. - Status DownloadAndRewriteBlock(BlockIdPB* block_id, int* block_count, int num_blocks); + // - 'dest_block_id' is set to the new ID of the downloaded block. + // - 'block_count' is incremented by 1. + Status DownloadAndRewriteBlock(const BlockIdPB& src_block_id, + int num_blocks, + int* block_count, + BlockIdPB* dest_block_id); // Download a single block. // Data block is opened with options so that it will fsync() on close. @@ -177,10 +196,8 @@ class TabletCopyClient { FsManager* const fs_manager_; const std::shared_ptr messenger_; - // State flags that enforce the progress of tablet copy. - bool started_; // Session started. - bool downloaded_wal_; // WAL segments downloaded. - bool downloaded_blocks_; // Data blocks downloaded. + // State of the progress of the tablet copy operation. + State state_; // Session-specific data items. bool replace_tombstoned_tablet_; @@ -196,9 +213,9 @@ class TabletCopyClient { std::shared_ptr proxy_; std::string session_id_; uint64_t session_idle_timeout_millis_; - gscoped_ptr superblock_; - gscoped_ptr new_superblock_; - gscoped_ptr remote_committed_cstate_; + std::unique_ptr remote_superblock_; + std::unique_ptr superblock_; + std::unique_ptr remote_committed_cstate_; std::vector wal_seqnos_; int64_t start_time_micros_; diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc index f8a0f4fa02..e5df72c3e6 100644 --- a/src/kudu/tserver/tablet_service.cc +++ b/src/kudu/tserver/tablet_service.cc @@ -1087,11 +1087,10 @@ void TabletServiceImpl::ScannerKeepAlive(const ScannerKeepAliveRequestPB *req, DCHECK(req->has_scanner_id()); SharedScanner scanner; if (!server_->scanner_manager()->LookupScanner(req->scanner_id(), &scanner)) { - resp->mutable_error()->set_code(TabletServerErrorPB::SCANNER_EXPIRED); - StatusToPB(Status::NotFound("Scanner not found"), - resp->mutable_error()->mutable_status()); - context->RespondSuccess(); - return; + resp->mutable_error()->set_code(TabletServerErrorPB::SCANNER_EXPIRED); + StatusToPB(Status::NotFound("Scanner not found"), resp->mutable_error()->mutable_status()); + context->RespondSuccess(); + return; } scanner->UpdateAccessTime(); context->RespondSuccess(); diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc index 8214655393..9bfa941fb4 100644 --- a/src/kudu/tserver/ts_tablet_manager.cc +++ b/src/kudu/tserver/ts_tablet_manager.cc @@ -317,16 +317,6 @@ Status TSTabletManager::CreateNewTablet(const string& table_id, return Status::OK(); } -// If 'expr' fails, log a message, tombstone the given tablet, and return. -#define TOMBSTONE_NOT_OK(expr, peer, msg) \ - do { \ - const Status& _s = (expr); \ - if (PREDICT_FALSE(!_s.ok())) { \ - LogAndTombstone((peer), (msg), _s); \ - return; \ - } \ - } while (0) - Status TSTabletManager::CheckLeaderTermNotLower(const string& tablet_id, int64_t leader_term, int64_t last_logged_term) { @@ -424,8 +414,9 @@ void TSTabletManager::RunTabletCopy( TabletServerErrorPB::Code error_code = TabletServerErrorPB::UNKNOWN_ERROR; - const string& tablet_id = req->tablet_id(); - const string& copy_source_uuid = req->copy_peer_uuid(); + // Copy these strings so they stay valid even after responding to the request. + string tablet_id = req->tablet_id(); // NOLINT(*) + string copy_source_uuid = req->copy_peer_uuid(); // NOLINT(*) HostPort copy_source_addr; CALLBACK_RETURN_NOT_OK(HostPortFromPB(req->copy_peer_addr(), ©_source_addr)); int64_t leader_term = req->caller_term(); @@ -454,7 +445,7 @@ void TSTabletManager::RunTabletCopy( switch (data_state) { case TABLET_DATA_COPYING: // This should not be possible due to the transition_in_progress_ "lock". - LOG(FATAL) << LogPrefix(tablet_id) << " Tablet Copy: " + LOG(FATAL) << LogPrefix(tablet_id) << "Tablet Copy: " << "Found tablet in TABLET_DATA_COPYING state during StartTabletCopy()"; case TABLET_DATA_TOMBSTONED: { int64_t last_logged_term = meta->tombstone_last_logged_opid().term(); @@ -474,10 +465,10 @@ void TSTabletManager::RunTabletCopy( // Tombstone the tablet and store the last-logged OpId. old_tablet_peer->Shutdown(); - // TODO: Because we begin shutdown of the tablet after we check our + // TODO(mpercy): Because we begin shutdown of the tablet after we check our // last-logged term against the leader's term, there may be operations - // in flight and it may be possible for the same check in the remote - // bootstrap client Start() method to fail. This will leave the replica in + // in flight and it may be possible for the same check in the tablet + // copy client Start() method to fail. This will leave the replica in // a tombstoned state, and then the leader with the latest log entries // will simply tablet copy this replica again. We could try to // check again after calling Shutdown(), and if the check fails, try to @@ -531,15 +522,22 @@ void TSTabletManager::RunTabletCopy( // Download all of the remote files. Status s = tc_client.FetchAll(implicit_cast(tablet_peer.get())); - TOMBSTONE_NOT_OK(s, tablet_peer, - Substitute("Tablet Copy: Unable to fetch data from remote peer $0", - kSrcPeerInfo)); + if (!s.ok()) { + LOG(WARNING) << LogPrefix(tablet_id) << "Tablet Copy: Unable to fetch data from remote peer " + << kSrcPeerInfo << ": " << s.ToString(); + return; + } MAYBE_FAULT(FLAGS_fault_crash_after_tc_files_fetched); // Write out the last files to make the new replica visible and update the // TabletDataState in the superblock to TABLET_DATA_READY. - TOMBSTONE_NOT_OK(tc_client.Finish(), tablet_peer, "Tablet Copy: Failure calling Finish()"); + s = tc_client.Finish(); + if (!s.ok()) { + LOG(WARNING) << LogPrefix(tablet_id) << "Tablet Copy: Failure calling Finish(): " + << s.ToString(); + return; + } // We don't tombstone the tablet if opening the tablet fails, because on next // startup it's still in a valid, fully-copied state. @@ -698,7 +696,7 @@ Status TSTabletManager::StartTabletStateTransitionUnlocked( Status TSTabletManager::OpenTabletMeta(const string& tablet_id, scoped_refptr* metadata) { - LOG(INFO) << "Loading metadata for tablet " << tablet_id; + LOG(INFO) << LogPrefix(tablet_id) << "Loading tablet metadata"; TRACE("Loading metadata..."); scoped_refptr meta; RETURN_NOT_OK_PREPEND(TabletMetadata::Load(fs_manager_, tablet_id, &meta), @@ -860,7 +858,9 @@ void TSTabletManager::RegisterTablet(const std::string& tablet_id, LOG(FATAL) << "Unable to register tablet peer " << tablet_id << ": already registered!"; } - LOG(INFO) << "Registered tablet " << tablet_id; + TabletDataState data_state = tablet_peer->tablet_metadata()->tablet_data_state(); + LOG(INFO) << LogPrefix(tablet_id) << Substitute("Registered tablet (data state: $0)", + TabletDataState_Name(data_state)); } bool TSTabletManager::LookupTablet(const string& tablet_id, @@ -1021,7 +1021,8 @@ Status TSTabletManager::DeleteTabletData(const scoped_refptr& me << "Deleting tablet data with delete state " << TabletDataState_Name(data_state); CHECK(data_state == TABLET_DATA_DELETED || - data_state == TABLET_DATA_TOMBSTONED) + data_state == TABLET_DATA_TOMBSTONED || + data_state == TABLET_DATA_COPYING) << "Unexpected data_state to delete tablet " << meta->tablet_id() << ": " << TabletDataState_Name(data_state) << " (" << data_state << ")"; @@ -1037,33 +1038,19 @@ Status TSTabletManager::DeleteTabletData(const scoped_refptr& me MAYBE_FAULT(FLAGS_fault_crash_after_wal_deleted); // We do not delete the superblock or the consensus metadata when tombstoning - // a tablet. - if (data_state == TABLET_DATA_TOMBSTONED) { + // a tablet or marking it as entering the tablet copy process. + if (data_state == TABLET_DATA_COPYING || + data_state == TABLET_DATA_TOMBSTONED) { return Status::OK(); } // Only TABLET_DATA_DELETED tablets get this far. + DCHECK_EQ(TABLET_DATA_DELETED, data_state); RETURN_NOT_OK(ConsensusMetadata::DeleteOnDiskData(meta->fs_manager(), meta->tablet_id())); MAYBE_FAULT(FLAGS_fault_crash_after_cmeta_deleted); return meta->DeleteSuperBlock(); } -void TSTabletManager::LogAndTombstone(const scoped_refptr& peer, - const std::string& msg, - const Status& s) { - const string& tablet_id = peer->tablet_id(); - LOG(WARNING) << LogPrefix(tablet_id) << msg << ": " << s.ToString(); - - Status delete_status = DeleteTabletData( - peer->tablet_metadata(), TABLET_DATA_TOMBSTONED, boost::optional()); - if (PREDICT_FALSE(!delete_status.ok())) { - // This failure should only either indicate a bug or an IO error. - LOG(FATAL) << LogPrefix(tablet_id) << "Failed to tombstone tablet after tablet copy: " - << delete_status.ToString(); - } - peer->StatusMessage(Substitute("Tombstoned tablet: $0 ($1)", msg, s.ToString())); -} - TransitionInProgressDeleter::TransitionInProgressDeleter( TransitionInProgressMap* map, rw_spinlock* lock, string entry) : in_progress_(map), lock_(lock), entry_(std::move(entry)) {} diff --git a/src/kudu/tserver/ts_tablet_manager.h b/src/kudu/tserver/ts_tablet_manager.h index 43f5396149..6f437be5de 100644 --- a/src/kudu/tserver/ts_tablet_manager.h +++ b/src/kudu/tserver/ts_tablet_manager.h @@ -270,13 +270,6 @@ class TSTabletManager : public tserver::TabletPeerLookupIf { int64_t leader_term, int64_t last_logged_term); - // Print a log message using the given info and tombstone the specified - // tablet. If tombstoning the tablet fails, a FATAL error is logged, resulting - // in a crash. - void LogAndTombstone(const scoped_refptr& peer, - const std::string& msg, - const Status& s); - TSTabletManagerStatePB state() const { shared_lock l(lock_); return state_;