Skip to content

Commit

Permalink
[tool] KUDU-2181 CLI to add master
Browse files Browse the repository at this point in the history
This change adds a CLI that invokes the AddMaster RPC to
perform Raft ChangeConfig.

This CLI will be part of the workflow to migrate to multiple
masters in a Kudu cluster.

Change-Id: I507f301d1aba17327eb35728eed0d765e86ef4cc
Reviewed-on: http://gerrit.cloudera.org:8080/16530
Reviewed-by: Alexey Serbin <[email protected]>
Tested-by: Bankim Bhavsar <[email protected]>
Reviewed-by: Andrew Wong <[email protected]>
  • Loading branch information
bbhavsar committed Feb 19, 2021
1 parent 4a88bde commit df61b71
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 62 deletions.
17 changes: 10 additions & 7 deletions src/kudu/client/master_proxy_rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ using strings::Substitute;

namespace kudu {

using master::AddMasterRequestPB;
using master::AddMasterResponsePB;
using master::AlterTableRequestPB;
using master::AlterTableResponsePB;
using master::ChangeTServerStateRequestPB;
Expand Down Expand Up @@ -259,7 +261,7 @@ bool AsyncLeaderMasterRpc<ReqClass, RespClass>::RetryOrReconnectIfNecessary(
return true;
}
if (err->unsupported_feature_flags_size() > 0) {
s = Status::NotSupported(Substitute("Cluster is does not support $0",
s = Status::NotSupported(Substitute("Cluster does not support $0",
rpc_name_));
}
}
Expand All @@ -283,19 +285,20 @@ bool AsyncLeaderMasterRpc<ReqClass, RespClass>::RetryOrReconnectIfNecessary(
return false;
}

template class AsyncLeaderMasterRpc<AddMasterRequestPB, AddMasterResponsePB>;
template class AsyncLeaderMasterRpc<AlterTableRequestPB, AlterTableResponsePB>;
template class AsyncLeaderMasterRpc<ChangeTServerStateRequestPB, ChangeTServerStateResponsePB>;
template class AsyncLeaderMasterRpc<CreateTableRequestPB, CreateTableResponsePB>;
template class AsyncLeaderMasterRpc<IsCreateTableDoneRequestPB, IsCreateTableDoneResponsePB>;
template class AsyncLeaderMasterRpc<DeleteTableRequestPB, DeleteTableResponsePB>;
template class AsyncLeaderMasterRpc<AlterTableRequestPB, AlterTableResponsePB>;
template class AsyncLeaderMasterRpc<IsAlterTableDoneRequestPB, IsAlterTableDoneResponsePB>;
template class AsyncLeaderMasterRpc<GetTableSchemaRequestPB, GetTableSchemaResponsePB>;
template class AsyncLeaderMasterRpc<GetTableLocationsRequestPB, GetTableLocationsResponsePB>;
template class AsyncLeaderMasterRpc<GetTabletLocationsRequestPB, GetTabletLocationsResponsePB>;
template class AsyncLeaderMasterRpc<GetTableSchemaRequestPB, GetTableSchemaResponsePB>;
template class AsyncLeaderMasterRpc<GetTableStatisticsRequestPB, GetTableStatisticsResponsePB>;
template class AsyncLeaderMasterRpc<GetTabletLocationsRequestPB, GetTabletLocationsResponsePB>;
template class AsyncLeaderMasterRpc<IsAlterTableDoneRequestPB, IsAlterTableDoneResponsePB>;
template class AsyncLeaderMasterRpc<IsCreateTableDoneRequestPB, IsCreateTableDoneResponsePB>;
template class AsyncLeaderMasterRpc<ListMastersRequestPB, ListMastersResponsePB>;
template class AsyncLeaderMasterRpc<ListTablesRequestPB, ListTablesResponsePB>;
template class AsyncLeaderMasterRpc<ListTabletServersRequestPB, ListTabletServersResponsePB>;
template class AsyncLeaderMasterRpc<ListMastersRequestPB, ListMastersResponsePB>;
template class AsyncLeaderMasterRpc<ReplaceTabletRequestPB, ReplaceTabletResponsePB>;

} // namespace internal
Expand Down
110 changes: 65 additions & 45 deletions src/kudu/master/dynamic_multi_master-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/net/socket.h"
#include "kudu/util/random.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
Expand Down Expand Up @@ -320,26 +319,6 @@ class DynamicMultiMasterTest : public KuduTest {
ASSERT_EQ(orig_num_masters_, resp.masters_size());
}

// Adds the specified master to the cluster returning the appropriate error Status for negative
// test cases.
Status AddMasterToCluster(const HostPort& master) {
auto add_master = [&] (int leader_master_idx) {
AddMasterRequestPB req;
AddMasterResponsePB resp;
RpcController rpc;
if (master != HostPort()) {
*req.mutable_rpc_addr() = HostPortToPB(master);
}
rpc.RequireServerFeature(MasterFeatures::DYNAMIC_MULTI_MASTER);
Status s = cluster_->master_proxy(leader_master_idx)->AddMaster(req, &resp, &rpc);
boost::optional<MasterErrorPB::Code> err_code(resp.has_error(), resp.error().code());
return std::make_pair(s, err_code);
};

RETURN_NOT_OK(RunLeaderMasterRPC(add_master));
return cluster_->AddMaster(new_master_);
}

// Remove the master specified by 'hp' and optional 'master_uuid' from the cluster.
// Unset 'hp' can be used to indicate to not supply RPC address in the RemoveMaster RPC request.
Status RemoveMasterFromCluster(const HostPort& hp, const string& master_uuid = "") {
Expand Down Expand Up @@ -381,6 +360,36 @@ class DynamicMultiMasterTest : public KuduTest {
return Status::OK();
}

// Adds the specified master to the cluster using the CLI tool.
// Unset 'master' can be used to indicate to not supply master address.
// Optional 'wait_secs' can be used to supply wait timeout to the master add CLI tool.
// Returns generic RuntimeError() on failure with the actual error in the optional 'err'
// output parameter.
Status AddMasterToClusterUsingCLITool(const HostPort& master, string* err = nullptr,
int wait_secs = 0) {
auto hps = cluster_->master_rpc_addrs();
vector<string> addresses;
addresses.reserve(hps.size());
for (const auto& hp : hps) {
addresses.emplace_back(hp.ToString());
}

vector<string> cmd = {"master", "add", JoinStrings(addresses, ",")};
if (master != HostPort()) {
cmd.emplace_back(master.ToString());
}
if (wait_secs != 0) {
cmd.emplace_back("-wait_secs=" + std::to_string(wait_secs));
}
RETURN_NOT_OK(tools::RunKuduTool(cmd, nullptr, err));
// master add CLI doesn't return an error if the master is already present.
// So don't try adding to the ExternalMiniCluster.
if (err != nullptr && err->find("Master already present") != string::npos) {
return Status::OK();
}
return cluster_->AddMaster(new_master_);
}

// Verify one of the 'expected_roles' and 'expected_member_type' of the new master by
// making RPC to it directly.
void VerifyNewMasterDirectly(const set<consensus::RaftPeerPB::Role>& expected_roles,
Expand Down Expand Up @@ -639,7 +648,7 @@ TEST_P(ParameterizedAddMasterTest, TestAddMasterCatchupFromWAL) {
// Bring up the new master and add to the cluster.
master_hps.emplace_back(reserved_hp_);
NO_FATALS(StartNewMaster(master_hps));
ASSERT_OK(AddMasterToCluster(reserved_hp_));
ASSERT_OK(AddMasterToClusterUsingCLITool(reserved_hp_, nullptr, 4));

// Newly added master will be caught up from WAL itself without requiring tablet copy
// since the system catalog is fresh with a single table.
Expand All @@ -666,18 +675,18 @@ TEST_P(ParameterizedAddMasterTest, TestAddMasterCatchupFromWAL) {
VerifyNewMasterDirectly({ consensus::RaftPeerPB::FOLLOWER, consensus::RaftPeerPB::LEADER },
consensus::RaftPeerPB::VOTER);

// Adding the same master again should return an error.
// Adding the same master again should print a message but not throw an error.
{
Status s = AddMasterToCluster(reserved_hp_);
ASSERT_TRUE(s.IsRemoteError());
ASSERT_STR_CONTAINS(s.message().ToString(), "Master already present");
string err;
ASSERT_OK(AddMasterToClusterUsingCLITool(reserved_hp_, &err));
ASSERT_STR_CONTAINS(err, "Master already present");
}

// Adding one of the former masters should return an error.
// Adding one of the former masters should print a message but not throw an error.
{
Status s = AddMasterToCluster(master_hps[0]);
ASSERT_TRUE(s.IsRemoteError()) << s.ToString();
ASSERT_STR_CONTAINS(s.message().ToString(), "Master already present");
string err;
ASSERT_OK(AddMasterToClusterUsingCLITool(master_hps[0], &err));
ASSERT_STR_CONTAINS(err, "Master already present");
}

NO_FATALS(VerifyClusterAfterMasterAddition(master_hps));
Expand All @@ -695,7 +704,10 @@ TEST_P(ParameterizedAddMasterTest, TestAddMasterSysCatalogCopy) {
// Bring up the new master and add to the cluster.
master_hps.emplace_back(reserved_hp_);
NO_FATALS(StartNewMaster(master_hps));
ASSERT_OK(AddMasterToCluster(reserved_hp_));
string err;
ASSERT_OK(AddMasterToClusterUsingCLITool(reserved_hp_, &err));
ASSERT_STR_MATCHES(err, Substitute("Please follow the next steps which includes system catalog "
"tablet copy", reserved_hp_.ToString()));

// Newly added master will be added to the master Raft config but won't be caught up
// from the WAL and hence remain as a NON_VOTER.
Expand Down Expand Up @@ -908,10 +920,10 @@ TEST_F(DynamicMultiMasterTest, TestAddMasterWithNoLastKnownAddr) {
master_hps.emplace_back(reserved_hp_);
NO_FATALS(StartNewMaster(master_hps));

Status actual = AddMasterToCluster(reserved_hp_);
ASSERT_TRUE(actual.IsRemoteError()) << actual.ToString();
ASSERT_STR_MATCHES(actual.ToString(),
"Invalid config to set as pending: Peer:.* has no address");
string err;
Status actual = AddMasterToClusterUsingCLITool(reserved_hp_, &err);
ASSERT_TRUE(actual.IsRuntimeError()) << actual.ToString();
ASSERT_STR_MATCHES(err, "Invalid config to set as pending: Peer:.* has no address");

// Verify no change in number of masters.
NO_FATALS(VerifyNumMastersAndGetAddresses(orig_num_masters_, &master_hps));
Expand All @@ -932,9 +944,10 @@ TEST_F(DynamicMultiMasterTest, TestAddMasterFeatureFlagNotSpecified) {
master_hps.emplace_back(reserved_hp_);
NO_FATALS(StartNewMaster(master_hps, false /* master_supports_change_config */));

Status actual = AddMasterToCluster(reserved_hp_);
ASSERT_TRUE(actual.IsRemoteError()) << actual.ToString();
ASSERT_STR_MATCHES(actual.ToString(), "unsupported feature flags");
string err;
Status actual = AddMasterToClusterUsingCLITool(reserved_hp_, &err);
ASSERT_TRUE(actual.IsRuntimeError()) << actual.ToString();
ASSERT_STR_MATCHES(err, "Cluster does not support AddMaster");

// Verify no change in number of masters.
NO_FATALS(VerifyNumMastersAndGetAddresses(orig_num_masters_, &master_hps));
Expand Down Expand Up @@ -1056,15 +1069,22 @@ TEST_F(DynamicMultiMasterTest, TestAddMasterMissingAndIncorrectAddress) {
NO_FATALS(StartNewMaster(master_hps));

// Empty HostPort
Status actual = AddMasterToCluster(HostPort());
ASSERT_TRUE(actual.IsRemoteError()) << actual.ToString();
ASSERT_STR_CONTAINS(actual.ToString(), "RPC address of master to be added not supplied");
{
string err;
Status actual = AddMasterToClusterUsingCLITool(HostPort(), &err);
ASSERT_TRUE(actual.IsRuntimeError()) << actual.ToString();
ASSERT_STR_CONTAINS(err, "must provide positional argument master_address");
}

// Non-routable incorrect hostname.
actual = AddMasterToCluster(HostPort("non-existent-path.local", Master::kDefaultPort));
ASSERT_TRUE(actual.IsRemoteError()) << actual.ToString();
ASSERT_STR_CONTAINS(actual.ToString(),
"Network error: unable to resolve address for non-existent-path.local");
{
string err;
Status actual = AddMasterToClusterUsingCLITool(
HostPort("non-existent-path.local", Master::kDefaultPort), &err);
ASSERT_TRUE(actual.IsRuntimeError()) << actual.ToString();
ASSERT_STR_CONTAINS(err,
"Network error: unable to resolve address for non-existent-path.local");
}

// Verify no change in number of masters.
NO_FATALS(VerifyNumMastersAndGetAddresses(orig_num_masters_, &master_hps));
Expand Down
3 changes: 3 additions & 0 deletions src/kudu/master/master.proto
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ message MasterErrorPB {

// The caller is not authorized to perform the attempted operation.
NOT_AUTHORIZED = 14;

// Master is already part of the Raft configuration.
MASTER_ALREADY_PRESENT = 15;
}

// The error code.
Expand Down
16 changes: 13 additions & 3 deletions src/kudu/master/master_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,20 @@ void MasterServiceImpl::AddMaster(const AddMasterRequestPB* req,
return;
}

Status s = server_->AddMaster(HostPortFromPB(req->rpc_addr()), rpc);
HostPort hp = HostPortFromPB(req->rpc_addr());
Status s = server_->AddMaster(hp, rpc);
if (!s.ok()) {
LOG(ERROR) << Substitute("Failed adding master $0:$1. $2", req->rpc_addr().host(),
req->rpc_addr().port(), s.ToString());
// Special handling for master already present error for retry scenarios.
// Responding back using RespondFailure() will clobber the error code
// and hence responding with success and setting the error code.
if (s.IsAlreadyPresent()) {
LOG(WARNING) << Substitute("Master $0 already present", hp.ToString());
StatusToPB(s, resp->mutable_error()->mutable_status());
resp->mutable_error()->set_code(MasterErrorPB::MASTER_ALREADY_PRESENT);
rpc->RespondSuccess();
return;
}
LOG(ERROR) << Substitute("Failed adding master $0. $1", hp.ToString(), s.ToString());
rpc->RespondFailure(s);
return;
}
Expand Down
7 changes: 7 additions & 0 deletions src/kudu/tools/kudu-tool-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1107,6 +1107,7 @@ TEST_F(ToolTest, TestModeHelp) {
"status.*Get the status",
"timestamp.*Get the current timestamp",
"list.*List masters in a Kudu cluster",
"add.*Add a master to the Raft configuration"
};
NO_FATALS(RunTestHelp(kCmd, kMasterModeRegexes));
NO_FATALS(RunTestHelpRpcFlags(kCmd,
Expand All @@ -1126,6 +1127,10 @@ TEST_F(ToolTest, TestModeHelp) {
NO_FATALS(RunTestHelp(kSubCmd, kMasterAuthzCacheModeRegexes));
NO_FATALS(RunTestHelpRpcFlags(kSubCmd, {"refresh"}));
}
{
NO_FATALS(RunTestHelp("master add --help",
{"-wait_secs \\(Timeout in seconds to wait for the newly added master"}));
}
{
const vector<string> kPbcModeRegexes = {
"dump.*Dump a PBC",
Expand Down Expand Up @@ -1314,6 +1319,8 @@ TEST_F(ToolTest, TestActionHelp) {

TEST_F(ToolTest, TestActionMissingRequiredArg) {
NO_FATALS(RunActionMissingRequiredArg("master list", "master_addresses"));
NO_FATALS(RunActionMissingRequiredArg("master add", "master_addresses"));
NO_FATALS(RunActionMissingRequiredArg("master add master.example.com", "master_address"));
NO_FATALS(RunActionMissingRequiredArg("cluster ksck --master_addresses=master.example.com",
"master_addresses"));
NO_FATALS(RunActionMissingRequiredArg("local_replica cmeta rewrite_raft_config fake_id",
Expand Down
33 changes: 27 additions & 6 deletions src/kudu/tools/tool_action_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -922,16 +922,30 @@ Status LeaderMasterProxy::SyncRpc(const Req& req,
const std::function<void(master::MasterServiceProxy*,
const Req&, Resp*,
rpc::RpcController*,
const ResponseCallback&)>& func) {
const ResponseCallback&)>& func,
std::vector<uint32_t> required_feature_flags) {
MonoTime deadline = MonoTime::Now() + MonoDelta::FromMilliseconds(FLAGS_timeout_ms);
Synchronizer sync;
AsyncLeaderMasterRpc<Req, Resp> rpc(deadline, client_.get(), BackoffType::EXPONENTIAL,
req, resp, func, std::move(func_name), sync.AsStatusCallback(), {});
req, resp, func, std::move(func_name), sync.AsStatusCallback(),
std::move(required_feature_flags));
rpc.SendRpc();
return sync.Wait();
}

// Explicit specializations for callers outside this compilation unit.
template
Status LeaderMasterProxy::SyncRpc(
const master::AddMasterRequestPB& req,
master::AddMasterResponsePB* resp,
string func_name,
const std::function<void(MasterServiceProxy*,
const master::AddMasterRequestPB&,
master::AddMasterResponsePB*,
RpcController*,
const ResponseCallback&)>& func,
std::vector<uint32_t> required_feature_flags);

template
Status LeaderMasterProxy::SyncRpc(
const master::ChangeTServerStateRequestPB& req,
Expand All @@ -941,7 +955,9 @@ Status LeaderMasterProxy::SyncRpc(
const master::ChangeTServerStateRequestPB&,
master::ChangeTServerStateResponsePB*,
RpcController*,
const ResponseCallback&)>& func);
const ResponseCallback&)>& func,
std::vector<uint32_t> required_feature_flags);

template
Status LeaderMasterProxy::SyncRpc(
const master::ListTabletServersRequestPB& req,
Expand All @@ -951,7 +967,9 @@ Status LeaderMasterProxy::SyncRpc(
const master::ListTabletServersRequestPB&,
master::ListTabletServersResponsePB*,
RpcController*,
const ResponseCallback&)>& func);
const ResponseCallback&)>& func,
std::vector<uint32_t> required_feature_flags);

template
Status LeaderMasterProxy::SyncRpc(
const master::ListMastersRequestPB& req,
Expand All @@ -961,7 +979,9 @@ Status LeaderMasterProxy::SyncRpc(
const master::ListMastersRequestPB&,
master::ListMastersResponsePB*,
RpcController*,
const ResponseCallback&)>& func);
const ResponseCallback&)>& func,
std::vector<uint32_t> required_feature_flags);

template
Status LeaderMasterProxy::SyncRpc(
const master::ReplaceTabletRequestPB& req,
Expand All @@ -971,7 +991,8 @@ Status LeaderMasterProxy::SyncRpc(
const master::ReplaceTabletRequestPB&,
master::ReplaceTabletResponsePB*,
RpcController*,
const ResponseCallback&)>& func);
const ResponseCallback&)>& func,
std::vector<uint32_t> required_feature_flags);

} // namespace tools
} // namespace kudu
3 changes: 2 additions & 1 deletion src/kudu/tools/tool_action_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ class LeaderMasterProxy {
const std::function<void(master::MasterServiceProxy*,
const Req&, Resp*,
rpc::RpcController*,
const rpc::ResponseCallback&)>& func)
const rpc::ResponseCallback&)>& func,
std::vector<uint32_t> required_feature_flags = {})
WARN_UNUSED_RESULT;

private:
Expand Down
Loading

0 comments on commit df61b71

Please sign in to comment.