Skip to content

Commit

Permalink
KUDU-921. tablet copy: Make the StartTabletCopy() RPC async
Browse files Browse the repository at this point in the history
This patch changes tablet copy to execute on its own thread pool.
This clears up threads on the Consensus service pool for other tasks.

A new ThreadPool called tablet_copy_pool_ was added to TSTabletManager
to run TabletCopy operations. Its max_threads parameter is tunable with
a new gflag and it has its max_queue_size hard-coded to 0 in order to
provide backpressure when it doesn't have capacity to immediately copy
new tablets.

This patch changes the semantics of StartTabletCopy() to return as soon
as the tablet copy has started -- it no longer waits until the process
is completed to return. Clients can follow the progress of the tablet
copy process using the ListTablets() RPC call and waiting for the tablet
to be in a RUNNING state.

A test was added in tablet_copy-itest that checks that the backpressure
mechanism is working such that submitting too many StartTabletCopy()
requests at one time results in a ServiceUnavailable error.

Some additional tests were added in tablet_copy_client_session-itest to
improve test coverage of the StartTabletCopy() code path.

Change-Id: I95c63f2bfd67624844447862efbdba9cb3676112
Reviewed-on: http://gerrit.cloudera.org:8080/5045
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <[email protected]>
  • Loading branch information
mpercy authored and toddlipcon committed Dec 8, 2016
1 parent 2acf1ba commit bec5f9b
Show file tree
Hide file tree
Showing 10 changed files with 341 additions and 89 deletions.
13 changes: 7 additions & 6 deletions src/kudu/integration-tests/cluster_itest_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,19 +302,20 @@ Status DeleteTablet(const TServerDetails* ts,
// Repeatedly try to delete the tablet, retrying on failure up to the
// specified timeout. Deletion can fail when other operations, such as
// bootstrap or tablet copy, are running.
void DeleteTabletWithRetries(const TServerDetails* ts, const std::string& tablet_id,
void DeleteTabletWithRetries(const TServerDetails* ts,
const std::string& tablet_id,
tablet::TabletDataState delete_type,
const boost::optional<int64_t>& config_opid_index,
const MonoDelta& timeout);

// Cause the remote to initiate tablet copy using the specified host as a
// source.
Status StartTabletCopy(const TServerDetails* ts,
const std::string& tablet_id,
const std::string& copy_source_uuid,
const HostPort& copy_source_addr,
int64_t caller_term,
const MonoDelta& timeout);
const std::string& tablet_id,
const std::string& copy_source_uuid,
const HostPort& copy_source_addr,
int64_t caller_term,
const MonoDelta& timeout);

} // namespace itest
} // namespace kudu
Expand Down
75 changes: 72 additions & 3 deletions src/kudu/integration-tests/tablet_copy-itest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ class TabletCopyITest : public KuduTest {
};

void TabletCopyITest::StartCluster(const vector<string>& extra_tserver_flags,
const vector<string>& extra_master_flags,
int num_tablet_servers) {
const vector<string>& extra_master_flags,
int num_tablet_servers) {
ExternalMiniClusterOptions opts;
opts.num_tablet_servers = num_tablet_servers;
opts.extra_tserver_flags = extra_tserver_flags;
Expand Down Expand Up @@ -186,7 +186,7 @@ TEST_F(TabletCopyITest, TestRejectRogueLeader) {
0, // Say I'm from term 0.
timeout);
ASSERT_TRUE(s.IsInvalidArgument());
ASSERT_STR_CONTAINS(s.ToString(), "term 0 lower than last logged term 1");
ASSERT_STR_CONTAINS(s.ToString(), "term 0, which is lower than last-logged term 1");

// Now pause the actual leader so we can bring him back as a zombie later.
ASSERT_OK(cluster_->tablet_server(zombie_leader_index)->Pause());
Expand Down Expand Up @@ -895,4 +895,73 @@ TEST_F(TabletCopyITest, TestTabletCopyingDeletedTabletFails) {
ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, 1));
}

// Test that the tablet copy thread pool being full results in throttling and
// backpressure on the callers.
TEST_F(TabletCopyITest, TestTabletCopyThrottling) {
MonoDelta kTimeout = MonoDelta::FromSeconds(30);
const int kNumTablets = 4;
// We want 2 tablet servers and we don't want the master to interfere when we
// forcibly make copies of tablets onto servers it doesn't know about.
// We also want to make sure only one tablet copy is possible at a given time
// in order to test the throttling.
NO_FATALS(StartCluster({"--num_tablets_to_copy_simultaneously=1"},
{"--master_tombstone_evicted_tablet_replicas=false"},
2));
// Shut down the 2nd tablet server; we'll create tablets on the first one.
cluster_->tablet_server(1)->Shutdown();

// Restart the Master so it doesn't try to assign tablets to the dead TS.
cluster_->master()->Shutdown();
ASSERT_OK(cluster_->master()->Restart());
ASSERT_OK(cluster_->WaitForTabletServerCount(1, kTimeout));

// Write a bunch of data to the first tablet server.
TestWorkload workload(cluster_.get());
workload.set_num_write_threads(8);
workload.set_num_replicas(1);
workload.set_num_tablets(kNumTablets);
workload.Setup();
workload.Start();
while (workload.rows_inserted() < 10000) {
SleepFor(MonoDelta::FromMilliseconds(10));
}

TServerDetails* ts0 = ts_map_[cluster_->tablet_server(0)->uuid()];
TServerDetails* ts1 = ts_map_[cluster_->tablet_server(1)->uuid()];

vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
ASSERT_OK(WaitForNumTabletsOnTS(ts0, kNumTablets, kTimeout, &tablets));

workload.StopAndJoin();

// Now we attempt to copy all of that data over to the 2nd tablet server.
// We will attempt to copy 4 tablets simultanously, but because we have tuned
// the number of tablet copy threads down to 1, we should get at least one
// ServiceUnavailable error.
ASSERT_OK(cluster_->tablet_server(1)->Restart());

// Attempt to copy all of the tablets from TS0 to TS1.
// Collect the status messages.
vector<Status> statuses(kNumTablets);
for (const auto& t : tablets) {
HostPort src_addr;
ASSERT_OK(HostPortFromPB(ts0->registration.rpc_addresses(0), &src_addr));
statuses.push_back(StartTabletCopy(ts1, t.tablet_status().tablet_id(), ts0->uuid(),
src_addr, 0, kTimeout));
}

// The "Service unavailable" messages are serialized as RemoteError type.
// Ensure that we got at least one.
int num_service_unavailable = 0;
for (const Status& s : statuses) {
if (!s.ok()) {
ASSERT_TRUE(s.IsRemoteError()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "Service unavailable: Thread pool is at capacity");
num_service_unavailable++;
}
}
ASSERT_GT(num_service_unavailable, 0);
LOG(INFO) << "Number of Service unavailable responses: " << num_service_unavailable;
}

} // namespace kudu
73 changes: 73 additions & 0 deletions src/kudu/integration-tests/tablet_copy_client_session-itest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,77 @@ TEST_F(TabletCopyClientSessionITest, TestStartTabletCopyWhileSourceBootstrapping
}
}

// Test that StartTabletCopy() works in different scenarios.
TEST_F(TabletCopyClientSessionITest, TestStartTabletCopy) {
NO_FATALS(PrepareClusterForTabletCopy());

TServerDetails* ts0 = ts_map_[cluster_->tablet_server(0)->uuid()];
TServerDetails* ts1 = ts_map_[cluster_->tablet_server(1)->uuid()];
vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
ASSERT_OK(WaitForNumTabletsOnTS(ts0, kDefaultNumTablets, kDefaultTimeout, &tablets));
ASSERT_EQ(kDefaultNumTablets, tablets.size());
const string& tablet_id = tablets[0].tablet_status().tablet_id();

// Scenarios to run tablet copy on top of:
enum Scenarios {
kPristine, // No tablets.
kTombstoned, // A tombstoned tablet.
kLast
};
for (int scenario = 0; scenario < kLast; scenario++) {
if (scenario == kTombstoned) {
NO_FATALS(DeleteTabletWithRetries(ts1, tablet_id,
TabletDataState::TABLET_DATA_TOMBSTONED,
boost::none, kDefaultTimeout));
}

// Run tablet copy.
HostPort src_addr;
ASSERT_OK(HostPortFromPB(ts0->registration.rpc_addresses(0), &src_addr));
ASSERT_OK(StartTabletCopy(ts1, tablet_id, ts0->uuid(), src_addr,
std::numeric_limits<int64_t>::max(), kDefaultTimeout));
ASSERT_OK(WaitUntilTabletRunning(ts1, tablet_id, kDefaultTimeout));
}
}

// Test that a tablet copy session will tombstone the tablet if the source
// server crashes in the middle of the tablet copy.
TEST_F(TabletCopyClientSessionITest, TestCopyFromCrashedSource) {
NO_FATALS(PrepareClusterForTabletCopy());

TServerDetails* ts0 = ts_map_[cluster_->tablet_server(0)->uuid()];
TServerDetails* ts1 = ts_map_[cluster_->tablet_server(1)->uuid()];
vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
ASSERT_OK(WaitForNumTabletsOnTS(ts0, kDefaultNumTablets, kDefaultTimeout, &tablets));
ASSERT_EQ(kDefaultNumTablets, tablets.size());
const string& tablet_id = tablets[0].tablet_status().tablet_id();

// Crash when serving tablet copy.
ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(0),
"fault_crash_on_handle_tc_fetch_data",
"1.0"));

HostPort src_addr;
ASSERT_OK(HostPortFromPB(ts0->registration.rpc_addresses(0), &src_addr));
ASSERT_OK(StartTabletCopy(ts1, tablet_id, ts0->uuid(), src_addr,
std::numeric_limits<int64_t>::max(), kDefaultTimeout));

ASSERT_OK(inspect_->WaitForTabletDataStateOnTS(1, tablet_id,
{ TabletDataState::TABLET_DATA_TOMBSTONED },
kDefaultTimeout));

// The source server will crash.
ASSERT_OK(cluster_->tablet_server(0)->WaitForInjectedCrash(kDefaultTimeout));
cluster_->tablet_server(0)->Shutdown();

// It will restart without the fault flag set.
ASSERT_OK(cluster_->tablet_server(0)->Restart());

// Attempt the copy again. This time it should succeed.
ASSERT_OK(WaitUntilTabletRunning(ts0, tablet_id, kDefaultTimeout));
ASSERT_OK(StartTabletCopy(ts1, tablet_id, ts0->uuid(), src_addr,
std::numeric_limits<int64_t>::max(), kDefaultTimeout));
ASSERT_OK(WaitUntilTabletRunning(ts1, tablet_id, kDefaultTimeout));
}

} // namespace kudu
9 changes: 5 additions & 4 deletions src/kudu/master/catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2297,10 +2297,11 @@ const NodeInstancePB& CatalogManager::NodeInstance() const {
return master_->instance_pb();
}

Status CatalogManager::StartTabletCopy(
const StartTabletCopyRequestPB& req,
boost::optional<kudu::tserver::TabletServerErrorPB::Code>* error_code) {
return Status::NotSupported("Tablet Copy not yet implemented for the master tablet");
void CatalogManager::StartTabletCopy(
const StartTabletCopyRequestPB* /* req */,
std::function<void(const Status&, TabletServerErrorPB::Code)> cb) {
cb(Status::NotSupported("Tablet Copy not yet implemented for the master tablet"),
TabletServerErrorPB::UNKNOWN_ERROR);
}

// Interface used by RetryingTSRpcTask to pick the tablet server to
Expand Down
10 changes: 5 additions & 5 deletions src/kudu/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -494,15 +494,15 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
//
// See also: TabletPeerLookupIf, ConsensusServiceImpl.
virtual Status GetTabletPeer(const std::string& tablet_id,
scoped_refptr<tablet::TabletPeer>* tablet_peer) const OVERRIDE;
scoped_refptr<tablet::TabletPeer>* tablet_peer) const override;

virtual const NodeInstancePB& NodeInstance() const OVERRIDE;
virtual const NodeInstancePB& NodeInstance() const override;

bool IsInitialized() const;

virtual Status StartTabletCopy(
const consensus::StartTabletCopyRequestPB& req,
boost::optional<kudu::tserver::TabletServerErrorPB::Code>* error_code) OVERRIDE;
virtual void StartTabletCopy(
const consensus::StartTabletCopyRequestPB* req,
std::function<void(const Status&, tserver::TabletServerErrorPB::Code)> cb) override;

// Returns this CatalogManager's role in a consensus configuration. CatalogManager
// must be initialized before calling this method.
Expand Down
4 changes: 2 additions & 2 deletions src/kudu/tserver/tablet_copy_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ void TabletCopyServiceImpl::CheckSessionActive(
}

void TabletCopyServiceImpl::FetchData(const FetchDataRequestPB* req,
FetchDataResponsePB* resp,
rpc::RpcContext* context) {
FetchDataResponsePB* resp,
rpc::RpcContext* context) {
const string& session_id = req->session_id();

// Look up and validate tablet copy session.
Expand Down
6 changes: 4 additions & 2 deletions src/kudu/tserver/tablet_peer_lookup.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#define KUDU_TSERVER_TABLET_PEER_LOOKUP_H_

#include <boost/optional/optional_fwd.hpp>
#include <functional>
#include <memory>
#include <string>

Expand Down Expand Up @@ -51,8 +52,9 @@ class TabletPeerLookupIf {

virtual const NodeInstancePB& NodeInstance() const = 0;

virtual Status StartTabletCopy(const consensus::StartTabletCopyRequestPB& req,
boost::optional<TabletServerErrorPB::Code>* error_code) = 0;
virtual void StartTabletCopy(
const consensus::StartTabletCopyRequestPB* req,
std::function<void(const Status&, TabletServerErrorPB::Code)> cb) = 0;
};

} // namespace tserver
Expand Down
21 changes: 10 additions & 11 deletions src/kudu/tserver/tablet_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -988,20 +988,19 @@ void ConsensusServiceImpl::GetConsensusState(const consensus::GetConsensusStateR
}

void ConsensusServiceImpl::StartTabletCopy(const StartTabletCopyRequestPB* req,
StartTabletCopyResponsePB* resp,
rpc::RpcContext* context) {
StartTabletCopyResponsePB* resp,
rpc::RpcContext* context) {
if (!CheckUuidMatchOrRespond(tablet_manager_, "StartTabletCopy", req, resp, context)) {
return;
}
boost::optional<TabletServerErrorPB::Code> error_code;
Status s = tablet_manager_->StartTabletCopy(*req, &error_code);
if (!s.ok()) {
SetupErrorAndRespond(resp->mutable_error(), s,
error_code.get_value_or(TabletServerErrorPB::UNKNOWN_ERROR),
context);
return;
}
context->RespondSuccess();
auto response_callback = [context, resp](const Status& s, TabletServerErrorPB::Code error_code) {
if (!s.ok()) {
SetupErrorAndRespond(resp->mutable_error(), s, error_code, context);
return;
}
context->RespondSuccess();
};
tablet_manager_->StartTabletCopy(req, response_callback);
}

void TabletServiceImpl::ScannerKeepAlive(const ScannerKeepAliveRequestPB *req,
Expand Down
Loading

0 comments on commit bec5f9b

Please sign in to comment.