Skip to content

Commit

Permalink
[master] KUDU-2181 Raft ChangeConfig request to remove a master
Browse files Browse the repository at this point in the history
This change adds RPC to request removal of an existing master from
a cluster. It builds on top of earlier change 9fc6c77 to add master.

Added tests for removal of non-leader master whether it's up
or down.

Change-Id: I76c03b8850faef60b65f85184c0a4db7cc6759ee
Reviewed-on: http://gerrit.cloudera.org:8080/16936
Reviewed-by: Alexey Serbin <[email protected]>
Tested-by: Bankim Bhavsar <[email protected]>
  • Loading branch information
bbhavsar committed Feb 8, 2021
1 parent 1a998fe commit 3c363d4
Show file tree
Hide file tree
Showing 10 changed files with 575 additions and 38 deletions.
5 changes: 5 additions & 0 deletions src/kudu/master/catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5623,6 +5623,7 @@ const char* CatalogManager::StateToString(State state) {
const char* CatalogManager::ChangeConfigOpToString(ChangeConfigOp type) {
switch (type) {
case CatalogManager::kAddMaster: return "add";
case CatalogManager::kRemoveMaster: return "remove";
}
__builtin_unreachable();
}
Expand Down Expand Up @@ -5667,6 +5668,9 @@ Status CatalogManager::InitiateMasterChangeConfig(ChangeConfigOp op, const HostP
peer->set_member_type(RaftPeerPB::NON_VOTER);
peer->mutable_attrs()->set_promote(true);
break;
case CatalogManager::kRemoveMaster:
req.set_type(consensus::REMOVE_PEER);
break;
default:
LOG(FATAL) << "Unsupported ChangeConfig operation: " << op;
}
Expand Down Expand Up @@ -5825,6 +5829,7 @@ INITTED_AND_LEADER_OR_RESPOND(GetTableLocationsResponsePB);
INITTED_AND_LEADER_OR_RESPOND(GetTableSchemaResponsePB);
INITTED_AND_LEADER_OR_RESPOND(GetTableStatisticsResponsePB);
INITTED_AND_LEADER_OR_RESPOND(GetTabletLocationsResponsePB);
INITTED_AND_LEADER_OR_RESPOND(RemoveMasterResponsePB);
INITTED_AND_LEADER_OR_RESPOND(ReplaceTabletResponsePB);

#undef INITTED_OR_RESPOND
Expand Down
3 changes: 2 additions & 1 deletion src/kudu/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,8 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
static std::string NormalizeTableName(const std::string& table_name);

enum ChangeConfigOp {
kAddMaster
kAddMaster,
kRemoveMaster
};

// Add/remove a master specified by 'hp' and 'uuid' by initiating change config request.
Expand Down
436 changes: 401 additions & 35 deletions src/kudu/master/dynamic_multi_master-test.cc

Large diffs are not rendered by default.

83 changes: 83 additions & 0 deletions src/kudu/master/master.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "kudu/fs/error_manager.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/master/catalog_manager.h"
#include "kudu/master/location_cache.h"
Expand Down Expand Up @@ -513,5 +514,87 @@ Status Master::AddMaster(const HostPort& hp, rpc::RpcContext* rpc) {
rpc);
}

Status Master::RemoveMaster(const HostPort& hp, const string& uuid, rpc::RpcContext* rpc) {
// Ensure requested master to be removed is part of list of masters.
auto consensus = catalog_manager_->master_consensus();
if (!consensus) {
return Status::IllegalState("consensus not running");
}
consensus::RaftConfigPB config = consensus->CommittedConfig();

// We can't allow removing a master from a single master configuration. Following
// check ensures a more appropriate error message is returned in case the removal
// was targeted for a different cluster.
if (config.peers_size() == 1) {
bool hp_found;
if (!config.peers(0).has_last_known_addr()) {
// In non-distributed master configurations, we may not store our own
// last known address in the Raft config.
DCHECK(registration_initialized_.load());
DCHECK_GT(registration_.rpc_addresses_size(), 0);
const auto& addresses = registration_.rpc_addresses();
hp_found = std::find_if(addresses.begin(), addresses.end(),
[&hp](const auto &hp_pb) {
return HostPortFromPB(hp_pb) == hp;
}) != addresses.end();
} else {
hp_found = HostPortFromPB(config.peers(0).last_known_addr()) == hp;
}
if (hp_found) {
return Status::InvalidArgument(Substitute("Can't remove master $0 in a single master "
"configuration", hp.ToString()));
}
return Status::NotFound(Substitute("Master $0 not found", hp.ToString()));
}

// UUIDs of masters matching the supplied HostPort 'hp' to remove.
vector<string> matching_masters;
for (const auto& peer : config.peers()) {
if (peer.has_last_known_addr() && HostPortFromPB(peer.last_known_addr()) == hp) {
matching_masters.push_back(peer.permanent_uuid());
}
}

string matching_uuid;
if (PREDICT_TRUE(matching_masters.size() == 1)) {
if (!uuid.empty() && uuid != matching_masters[0]) {
return Status::InvalidArgument(Substitute("Mismatch in UUID of the master $0 to be removed. "
"Expected: $1, supplied: $2.", hp.ToString(),
matching_masters[0], uuid));
}
matching_uuid = matching_masters[0];
} else if (matching_masters.empty()) {
return Status::NotFound(Substitute("Master $0 not found", hp.ToString()));
} else {
// We found multiple masters with matching HostPorts. Use the optional uuid to
// disambiguate, if possible.
DCHECK_GE(matching_masters.size(), 2);
if (!uuid.empty()) {
int matching_uuids_count = std::count(matching_masters.begin(), matching_masters.end(), uuid);
if (matching_uuids_count == 1) {
matching_uuid = uuid;
} else {
LOG(FATAL) << Substitute("Found multiple masters with same RPC address $0 and UUID $1",
hp.ToString(), uuid);
}
} else {
// Uuid not supplied and we found multiple matching HostPorts.
return Status::InvalidArgument(Substitute("Found multiple masters with same RPC address $0 "
"and following UUIDs $1. Supply UUID to "
"disambiguate.", hp.ToString(),
JoinStrings(matching_masters, ",")));
}
}

if (matching_uuid == fs_manager_->uuid()) {
return Status::InvalidArgument(Substitute("Can't remove the leader master $0", hp.ToString()));
}

// No early validation for whether a config change is in progress.
// If it's in progress, on initiating config change Raft will return error.
return catalog_manager()->InitiateMasterChangeConfig(CatalogManager::kRemoveMaster, hp,
matching_uuid, rpc);
}

} // namespace master
} // namespace kudu
8 changes: 7 additions & 1 deletion src/kudu/master/master.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <atomic>
#include <cstdint>
#include <memory>
#include <string>
#include <vector>

#include "kudu/common/wire_protocol.pb.h"
Expand Down Expand Up @@ -123,9 +124,14 @@ class Master : public kserver::KuduServer {

// Adds the master specified by 'hp' by initiating change config request.
// RpContext 'rpc' will be used to respond back to the client asynchronously.
// Returns the status of the master addition request.
// Returns the status of initiating the master addition request.
Status AddMaster(const HostPort& hp, rpc::RpcContext* rpc);

// Removes the master specified by 'hp' and optional 'uuid' by initiating change config request.
// RpContext 'rpc' will be used to respond back to the client asynchronously.
// Returns the status of initiating the master removal request.
Status RemoveMaster(const HostPort& hp, const std::string& uuid, rpc::RpcContext* rpc);

MaintenanceManager* maintenance_manager() {
return maintenance_manager_.get();
}
Expand Down
18 changes: 17 additions & 1 deletion src/kudu/master/master.proto
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,17 @@ message AddMasterResponsePB {
optional MasterErrorPB error = 1;
}

message RemoveMasterRequestPB {
// HostPort of the master to be removed
optional HostPortPB rpc_addr = 1;
// Optional UUID of the master to be removed.
optional string master_uuid = 2;
}

message RemoveMasterResponsePB {
optional MasterErrorPB error = 1;
}

// GetMasterRegistrationRequest/Response: get the instance id and
// HTTP/RPC addresses for this Master server.
message GetMasterRegistrationRequestPB {
Expand Down Expand Up @@ -1103,11 +1114,16 @@ service MasterService {
option (kudu.rpc.authz_method) = "AuthorizeSuperUser";
}

// Add a new master to existing cluster.
// Add a new master to the existing cluster.
rpc AddMaster(AddMasterRequestPB) returns (AddMasterResponsePB) {
option (kudu.rpc.authz_method) = "AuthorizeSuperUser";
}

// Remove a master from the existing cluster.
rpc RemoveMaster(RemoveMasterRequestPB) returns (RemoveMasterResponsePB) {
option (kudu.rpc.authz_method) = "AuthorizeSuperUser";
}

// Master->Master RPCs
// ------------------------------------------------------------

Expand Down
40 changes: 40 additions & 0 deletions src/kudu/master/master_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ void MasterServiceImpl::ChangeTServerState(const ChangeTServerStateRequestPB* re
void MasterServiceImpl::AddMaster(const AddMasterRequestPB* req,
AddMasterResponsePB* resp,
rpc::RpcContext* rpc) {
// This feature flag is part of SupportsFeature() function in master service but it's possible
// that a client may not specify the feature flag.
// This check protects access to the server feature irrespective of whether the
// feature flag is specified in the client.
if (!FLAGS_master_support_change_config) {
rpc->RespondFailure(Status::NotSupported("Adding master is not supported"));
return;
Expand Down Expand Up @@ -271,6 +275,42 @@ void MasterServiceImpl::AddMaster(const AddMasterRequestPB* req,
// See completion_cb in CatalogManager::InitiateMasterChangeConfig().
}

void MasterServiceImpl::RemoveMaster(const RemoveMasterRequestPB* req,
RemoveMasterResponsePB* resp,
rpc::RpcContext* rpc) {
// This feature flag is part of SupportsFeature() function in master service but it's possible
// that a client may not specify the feature flag.
// This check protects access to the server feature irrespective of whether the
// feature flag is specified in the client.
if (!FLAGS_master_support_change_config) {
rpc->RespondFailure(Status::NotSupported("Removing master is not supported"));
return;
}

if (!req->has_rpc_addr()) {
rpc->RespondFailure(Status::InvalidArgument(
"RPC address of the master to be removed not supplied"));
return;
}

CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager());
if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, rpc)) {
return;
}

HostPort hp = HostPortFromPB(req->rpc_addr());
string uuid = req->has_master_uuid() ? req->master_uuid() : string();
Status s = server_->RemoveMaster(hp, uuid, rpc);
if (!s.ok()) {
LOG(ERROR) << Substitute("Failed removing master $0. $1", hp.ToString(), s.ToString());
rpc->RespondFailure(s);
return;
}
// ChangeConfig request successfully submitted. Once the ChangeConfig request is complete
// the completion callback will respond back with the result to the RPC client.
// See completion_cb in CatalogManager::InitiateMasterChangeConfig().
}

void MasterServiceImpl::TSHeartbeat(const TSHeartbeatRequestPB* req,
TSHeartbeatResponsePB* resp,
rpc::RpcContext* rpc) {
Expand Down
5 changes: 5 additions & 0 deletions src/kudu/master/master_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class PingRequestPB;
class PingResponsePB;
class RefreshAuthzCacheRequestPB;
class RefreshAuthzCacheResponsePB;
class RemoveMasterRequestPB;
class RemoveMasterResponsePB;
class ReplaceTabletRequestPB;
class ReplaceTabletResponsePB;
class TSHeartbeatRequestPB;
Expand Down Expand Up @@ -109,6 +111,9 @@ class MasterServiceImpl : public MasterServiceIf {
void AddMaster(const AddMasterRequestPB* req,
AddMasterResponsePB* resp, rpc::RpcContext* rpc) override;

void RemoveMaster(const RemoveMasterRequestPB* req,
RemoveMasterResponsePB* resp, rpc::RpcContext* rpc) override;

void Ping(const PingRequestPB* req,
PingResponsePB* resp,
rpc::RpcContext* rpc) override;
Expand Down
10 changes: 10 additions & 0 deletions src/kudu/mini-cluster/external_mini_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,16 @@ Status ExternalMiniCluster::AddMaster(const scoped_refptr<ExternalMaster>& new_m
return Status::OK();
}

Status ExternalMiniCluster::RemoveMaster(const HostPort& hp) {
for (auto it = masters_.begin(); it != masters_.end(); ++it) {
if ((*it)->bound_rpc_hostport() == hp) {
masters_.erase(it);
return Status::OK();
}
}
return Status::NotFound(Substitute("Master $0 not found in ExternalMiniCluster", hp.ToString()));
}

//------------------------------------------------------------
// ExternalDaemon
//------------------------------------------------------------
Expand Down
5 changes: 5 additions & 0 deletions src/kudu/mini-cluster/external_mini_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,11 @@ class ExternalMiniCluster : public MiniCluster {
// dynamically after bringing up the ExternalMiniCluster.
Status AddMaster(const scoped_refptr<ExternalMaster>& master);

// Removes any bookkeeping of the master specified by 'hp' from the ExternalMiniCluster
// after already having run through a successful master Raft change config to remove it.
// This helps keep the state of the actual cluster in sync with the state in ExternalMiniCluster.
Status RemoveMaster(const HostPort& hp);

private:
Status StartMasters();

Expand Down

0 comments on commit 3c363d4

Please sign in to comment.