diff --git a/src/kudu/master/CMakeLists.txt b/src/kudu/master/CMakeLists.txt index 2706e47907..dafeea680b 100644 --- a/src/kudu/master/CMakeLists.txt +++ b/src/kudu/master/CMakeLists.txt @@ -108,7 +108,7 @@ SET_KUDU_TEST_LINK_LIBS( ADD_KUDU_TEST(auto_rebalancer-test) ADD_KUDU_TEST(catalog_manager-test) -ADD_KUDU_TEST(dynamic_multi_master-test) +ADD_KUDU_TEST(dynamic_multi_master-test NUM_SHARDS 4) ADD_KUDU_TEST(hms_notification_log_listener-test) ADD_KUDU_TEST(location_cache-test DATA_FILES ../scripts/first_argument.sh) ADD_KUDU_TEST(master_options-test) diff --git a/src/kudu/master/dynamic_multi_master-test.cc b/src/kudu/master/dynamic_multi_master-test.cc index 09a1d7b118..9c3ac291e1 100644 --- a/src/kudu/master/dynamic_multi_master-test.cc +++ b/src/kudu/master/dynamic_multi_master-test.cc @@ -18,9 +18,10 @@ #include #include #include +#include +#include #include #include -#include #include #include #include @@ -28,6 +29,7 @@ #include #include +#include #include #include @@ -39,6 +41,8 @@ #include "kudu/consensus/consensus.pb.h" #include "kudu/consensus/consensus.proxy.h" #include "kudu/consensus/metadata.pb.h" +#include "kudu/fs/dir_manager.h" +#include "kudu/fs/fs_manager.h" #include "kudu/gutil/map-util.h" #include "kudu/gutil/ref_counted.h" #include "kudu/gutil/strings/join.h" @@ -54,6 +58,8 @@ #include "kudu/rpc/rpc_controller.h" #include "kudu/tools/tool_test_util.h" #include "kudu/tserver/tserver.pb.h" +#include "kudu/util/env.h" +#include "kudu/util/env_util.h" #include "kudu/util/metrics.h" #include "kudu/util/monotime.h" #include "kudu/util/net/net_util.h" @@ -64,6 +70,9 @@ #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" +DECLARE_string(fs_wal_dir); +DECLARE_string(fs_data_dirs); + METRIC_DECLARE_histogram(log_gc_duration); METRIC_DECLARE_entity(tablet); @@ -85,7 +94,7 @@ using kudu::consensus::LeaderStepDownResponsePB; using kudu::consensus::RaftConfigPB; using kudu::consensus::RaftPeerPB; using kudu::rpc::RpcController; -using std::set; +using std::map; using std::string; using std::tuple; using std::unique_ptr; @@ -115,6 +124,8 @@ static Status CreateTable(ExternalMiniCluster* cluster, // Test class for testing addition/removal of masters to a Kudu cluster. class DynamicMultiMasterTest : public KuduTest { protected: + typedef map EnvVars; + void SetUpWithNumMasters(int num_masters) { // Initial number of masters in the cluster before adding a master. orig_num_masters_ = num_masters; @@ -294,51 +305,38 @@ class DynamicMultiMasterTest : public KuduTest { void StartNewMaster(const vector& master_hps, const HostPort& new_master_hp, int new_master_idx, - bool master_supports_change_config = true) { + scoped_refptr* new_master_out) { vector master_addresses; master_addresses.reserve(master_hps.size()); for (const auto& hp : master_hps) { master_addresses.emplace_back(hp.ToString()); } - // Start with an existing master daemon's options, but modify them for use in a new master - ExternalDaemonOptions new_master_opts = cluster_->master(0)->opts(); - const string new_master_id = Substitute("master-$0", new_master_idx); - new_master_opts.wal_dir = cluster_->GetWalPath(new_master_id); - new_master_opts.data_dirs = cluster_->GetDataPaths(new_master_id); - new_master_opts.log_dir = cluster_->GetLogPath(new_master_id); - new_master_opts.rpc_bind_address = new_master_hp; + ExternalDaemonOptions new_master_opts; + ASSERT_OK(BuildMasterOpts(new_master_idx, new_master_hp, &new_master_opts)); auto& flags = new_master_opts.extra_flags; flags.insert(flags.end(), {"--master_addresses=" + JoinStrings(master_addresses, ","), "--master_address_add_new_master=" + new_master_hp.ToString()}); LOG(INFO) << "Bringing up the new master at: " << new_master_hp.ToString(); - new_master_.reset(new ExternalMaster(new_master_opts)); - ASSERT_OK(new_master_->Start()); - ASSERT_OK(new_master_->WaitForCatalogManager()); + scoped_refptr master = new ExternalMaster(new_master_opts); + ASSERT_OK(master->Start()); + ASSERT_OK(master->WaitForCatalogManager()); Sockaddr new_master_addr; ASSERT_OK(SockaddrFromHostPort(new_master_hp, &new_master_addr)); - new_master_proxy_.reset( - new MasterServiceProxy(new_master_opts.messenger, new_master_addr, new_master_hp.host())); - { - GetMasterRegistrationRequestPB req; - GetMasterRegistrationResponsePB resp; - RpcController rpc; + MasterServiceProxy new_master_proxy(new_master_opts.messenger, new_master_addr, + new_master_hp.host()); - ASSERT_OK(new_master_proxy_->GetMasterRegistration(req, &resp, &rpc)); - ASSERT_FALSE(resp.has_error()); - if (master_supports_change_config) { - ASSERT_EQ(RaftPeerPB::NON_VOTER, resp.member_type()); - ASSERT_EQ(RaftPeerPB::LEARNER, resp.role()); - } else { - // For a new master brought that doesn't support change config, it'll be started - // as a VOTER and become FOLLOWER if the other masters are reachable. - ASSERT_EQ(RaftPeerPB::VOTER, resp.member_type()); - ASSERT_EQ(RaftPeerPB::FOLLOWER, resp.role()); - } - } + GetMasterRegistrationRequestPB req; + GetMasterRegistrationResponsePB resp; + RpcController rpc; + ASSERT_OK(new_master_proxy.GetMasterRegistration(req, &resp, &rpc)); + ASSERT_FALSE(resp.has_error()); + ASSERT_EQ(RaftPeerPB::NON_VOTER, resp.member_type()); + ASSERT_EQ(RaftPeerPB::LEARNER, resp.role()); + *new_master_out = std::move(master); } // Fetch a follower (non-leader) master index from the cluster. @@ -359,13 +357,44 @@ class DynamicMultiMasterTest : public KuduTest { return Status::OK(); } + // Builds and returns ExternalDaemonOptions used to add master with address 'rpc_addr' and + // 'master_idx' which helps with naming master's directories. + // Output 'kerberos_env_vars' parameter must be supplied for a secure cluster i.e. when Kerberos + // is enabled and returns Kerberos related environment variables. + Status BuildMasterOpts(int master_idx, const HostPort& rpc_addr, + ExternalDaemonOptions* master_opts, + EnvVars* kerberos_env_vars = nullptr) { + // Start with an existing master daemon's options, but modify them for use in a new master. + const string new_master_id = Substitute("master-$0", master_idx); + ExternalDaemonOptions opts = cluster_->master(0)->opts(); + opts.rpc_bind_address = rpc_addr; + opts.wal_dir = cluster_->GetWalPath(new_master_id); + opts.data_dirs = cluster_->GetDataPaths(new_master_id); + opts.log_dir = cluster_->GetLogPath(new_master_id); + if (opts_.enable_kerberos) { + CHECK(kerberos_env_vars); + vector flags; + RETURN_NOT_OK(cluster::ExternalDaemon::CreateKerberosConfig( + cluster_->kdc(), opts_.principal, rpc_addr.host(), &flags, kerberos_env_vars)); + // Inserting the Kerberos related flags at the end will override flags from the master + // which was used as a basis for the new master. + opts.extra_flags.insert(opts.extra_flags.end(), std::make_move_iterator(flags.begin()), + std::make_move_iterator(flags.end())); + } + *master_opts = std::move(opts); + return Status::OK(); + } + // Adds the specified master to the cluster using the CLI tool. - // Unset 'master' can be used to indicate to not supply master address. + // Unset 'rpc_bind_address' in 'opts' can be used to indicate to not supply master address. + // Optional 'env_vars' can be used to set environment variables while running kudu tool. // Optional 'wait_secs' can be used to supply wait timeout to the master add CLI tool. + // Optional 'kudu_binary' can be used to supply the path to the kudu binary. // Returns generic RuntimeError() on failure with the actual error in the optional 'err' // output parameter. - Status AddMasterToClusterUsingCLITool(const HostPort& master, string* err = nullptr, - int wait_secs = 0) { + Status AddMasterToClusterUsingCLITool(const ExternalDaemonOptions& opts, string* err = nullptr, + EnvVars env_vars = {}, int wait_secs = 0, + const string& kudu_binary = "") { auto hps = cluster_->master_rpc_addrs(); vector addresses; addresses.reserve(hps.size()); @@ -374,19 +403,29 @@ class DynamicMultiMasterTest : public KuduTest { } vector cmd = {"master", "add", JoinStrings(addresses, ",")}; - if (master.Initialized()) { - cmd.emplace_back(master.ToString()); + if (opts.rpc_bind_address.Initialized()) { + cmd.emplace_back(opts.rpc_bind_address.ToString()); } + + vector new_master_flags = ExternalMaster::GetMasterFlags(opts); + cmd.insert(cmd.end(), std::make_move_iterator(new_master_flags.begin()), + std::make_move_iterator(new_master_flags.end())); + cmd.insert(cmd.end(), opts.extra_flags.begin(), opts.extra_flags.end()); + if (wait_secs != 0) { cmd.emplace_back("-wait_secs=" + std::to_string(wait_secs)); } - RETURN_NOT_OK(tools::RunKuduTool(cmd, nullptr, err)); - // master add CLI doesn't return an error if the master is already present. - // So don't try adding to the ExternalMiniCluster. - if (err != nullptr && err->find("Master already present") != string::npos) { - return Status::OK(); + if (!kudu_binary.empty()) { + cmd.emplace_back("-kudu_abs_path=" + kudu_binary); + } + + RETURN_NOT_OK(env_util::CreateDirsRecursively(Env::Default(), opts.log_dir)); + Status s = tools::RunKuduTool(cmd, nullptr, err, "", std::move(env_vars)); + if (!s.ok() && err != nullptr) { + LOG(INFO) << "Add master stderr: " << *err; } - return cluster_->AddMaster(new_master_); + RETURN_NOT_OK(s); + return Status::OK(); } // Removes the specified master from the cluster using the CLI tool. @@ -414,20 +453,6 @@ class DynamicMultiMasterTest : public KuduTest { return cluster_->RemoveMaster(master_to_remove); } - // Verify one of the 'expected_roles' and 'expected_member_type' of the new master by - // making RPC to it directly. - void VerifyNewMasterDirectly(const set& expected_roles, - RaftPeerPB::MemberType expected_member_type) { - GetMasterRegistrationRequestPB req; - GetMasterRegistrationResponsePB resp; - RpcController rpc; - - ASSERT_OK(new_master_proxy_->GetMasterRegistration(req, &resp, &rpc)); - ASSERT_FALSE(resp.has_error()); - ASSERT_TRUE(ContainsKey(expected_roles, resp.role())); - ASSERT_EQ(expected_member_type, resp.member_type()); - } - // Fetch consensus state of the leader master. void GetLeaderMasterConsensusState(RaftConfigPB* consensus_config) { int leader_master_idx; @@ -452,22 +477,6 @@ class DynamicMultiMasterTest : public KuduTest { cstate.pending_config() : cstate.committed_config(); } - // Verify the newly added master is in FAILED_UNRECOVERABLE state and can't be caught up - // from WAL. - void VerifyNewMasterInFailedUnrecoverableState(int expected_num_masters) { - RaftConfigPB config; - NO_FATALS(GetLeaderMasterConsensusState(&config)); - ASSERT_EQ(expected_num_masters, config.peers_size()); - int num_new_masters_found = 0; - for (const auto& peer : config.peers()) { - if (peer.permanent_uuid() == new_master_->uuid()) { - ASSERT_EQ(HealthReportPB::FAILED_UNRECOVERABLE, peer.health_report().overall_health()); - num_new_masters_found++; - } - } - ASSERT_EQ(1, num_new_masters_found); - } - void VerifyDeadMasterInSpecifiedState(const string& dead_master_uuid, HealthReportPB::HealthStatus expected_state) { RaftConfigPB config; @@ -522,6 +531,21 @@ class DynamicMultiMasterTest : public KuduTest { }); } + // Fetch uuid of the specified 'fs_wal_dir' and 'fs_data_dirs'. + static Status GetFsUuid(const string& fs_wal_dir, const vector& fs_data_dirs, + string* uuid) { + google::FlagSaver saver; + FLAGS_fs_wal_dir = fs_wal_dir; + FLAGS_fs_data_dirs = JoinStrings(fs_data_dirs, ","); + FsManagerOpts fs_opts; + fs_opts.read_only = true; + fs_opts.update_instances = fs::UpdateInstanceBehavior::DONT_UPDATE; + FsManager fs_manager(Env::Default(), std::move(fs_opts)); + RETURN_NOT_OK(fs_manager.PartialOpen()); + *uuid = fs_manager.uuid(); + return Status::OK(); + } + // Verification steps after the new master has been added successfully and it's promoted // as VOTER. The supplied 'master_hps' includes the new_master as well. void VerifyClusterAfterMasterAddition(const vector& master_hps, @@ -535,13 +559,14 @@ class DynamicMultiMasterTest : public KuduTest { for (int i = 0; i < cluster_->num_masters(); i++) { master_uuids.emplace(cluster_->master(i)->uuid()); } - master_uuids.emplace(new_master_->uuid()); + string new_master_uuid; + ASSERT_OK(GetFsUuid(new_master_opts_.wal_dir, new_master_opts_.data_dirs, &new_master_uuid)); + master_uuids.emplace(new_master_uuid); // Shutdown the cluster and the new master daemon process. // This allows ExternalMiniCluster to manage the newly added master and allows // client to connect to the new master if it's elected the leader. LOG(INFO) << "Shutting down the old cluster"; - new_master_->Shutdown(); cluster_.reset(); LOG(INFO) << "Bringing up the migrated cluster"; @@ -576,7 +601,7 @@ class DynamicMultiMasterTest : public KuduTest { } // Transfer leadership to the new master. - NO_FATALS(TransferMasterLeadership(&migrated_cluster, new_master_->uuid())); + NO_FATALS(TransferMasterLeadership(&migrated_cluster, new_master_uuid)); shared_ptr client; ASSERT_OK(migrated_cluster.CreateClient(nullptr, &client)); @@ -694,61 +719,6 @@ class DynamicMultiMasterTest : public KuduTest { *dead_master_hp = master_to_recover_hp; } - // Helper function that verifies that the newly added master can't be caught up from WAL - // and remains as NON_VOTER. - void VerifyNewNonVoterMaster(const HostPort& new_master_hp, - int expected_num_masters) { - // Newly added master will be added to the master Raft config but won't be caught up - // from the WAL and hence remain as a NON_VOTER. - ListMastersResponsePB list_resp; - NO_FATALS(RunListMasters(&list_resp)); - ASSERT_EQ(expected_num_masters, list_resp.masters_size()); - - for (const auto& master : list_resp.masters()) { - ASSERT_EQ(1, master.registration().rpc_addresses_size()); - auto hp = HostPortFromPB(master.registration().rpc_addresses(0)); - if (hp == new_master_hp) { - ASSERT_EQ(RaftPeerPB::NON_VOTER, master.member_type()); - ASSERT_TRUE(master.role() == RaftPeerPB::LEARNER); - } - } - - // Double check by directly contacting the new master. - NO_FATALS(VerifyNewMasterDirectly({ RaftPeerPB::LEARNER }, RaftPeerPB::NON_VOTER)); - - // Verify new master is in FAILED_UNRECOVERABLE state. - // This health state update may take some round trips between the masters and - // hence wrapping it under ASSERT_EVENTUALLY. - ASSERT_EVENTUALLY([&] { - NO_FATALS(VerifyNewMasterInFailedUnrecoverableState(expected_num_masters)); - }); - } - - // Helper function to copy system catalog from 'src_master_hp' master. - void CopySystemCatalog(const HostPort& src_master_hp) { - LOG(INFO) << "Shutting down the new master"; - new_master_->Shutdown(); - - LOG(INFO) << "Deleting the system catalog"; - // Delete sys catalog on local master - ASSERT_OK(tools::RunKuduTool({"local_replica", "delete", - master::SysCatalogTable::kSysCatalogTabletId, - "-fs_data_dirs=" + JoinStrings(new_master_->data_dirs(), ","), - "-fs_wal_dir=" + new_master_->wal_dir(), - "-clean_unsafe"})); - - // Copy from remote system catalog from specified master - LOG(INFO) << "Copying from remote master: " << src_master_hp.ToString(); - ASSERT_OK(tools::RunKuduTool({"local_replica", "copy_from_remote", - master::SysCatalogTable::kSysCatalogTabletId, - src_master_hp.ToString(), - "-fs_data_dirs=" + JoinStrings(new_master_->data_dirs(), ","), - "-fs_wal_dir=" + new_master_->wal_dir()})); - - LOG(INFO) << "Restarting the new master"; - ASSERT_OK(new_master_->Restart()); - } - // Tracks the current number of masters in the cluster int orig_num_masters_; ExternalMiniClusterOptions opts_; @@ -758,26 +728,31 @@ class DynamicMultiMasterTest : public KuduTest { unique_ptr reserved_socket_; Sockaddr reserved_addr_; HostPort reserved_hp_; - unique_ptr new_master_proxy_; - scoped_refptr new_master_; + ExternalDaemonOptions new_master_opts_; static const char* const kTableName; }; const char* const DynamicMultiMasterTest::kTableName = "first_table"; -// Parameterized DynamicMultiMasterTest class that works with different initial number of masters. +// Parameterized DynamicMultiMasterTest class that works with different initial number of masters +// and secure/unsecure clusters. class ParameterizedAddMasterTest : public DynamicMultiMasterTest, - public ::testing::WithParamInterface { + public ::testing::WithParamInterface> { public: void SetUp() override { - NO_FATALS(SetUpWithNumMasters(GetParam())); + NO_FATALS(SetUpWithNumMasters(std::get<0>(GetParam()))); + opts_.enable_kerberos = std::get<1>(GetParam()); } }; INSTANTIATE_TEST_SUITE_P(, ParameterizedAddMasterTest, - // Initial number of masters in the cluster before adding a new master - ::testing::Values(1, 2)); + ::testing::Combine( + // Initial number of masters in the cluster before adding a new + // master + ::testing::Values(1, 2), + // Whether Kerberos is enabled + ::testing::Bool())); // This test starts a cluster, creates a table and then adds a new master. // For a system catalog with little data, the new master can be caught up from WAL and @@ -794,38 +769,40 @@ TEST_P(ParameterizedAddMasterTest, TestAddMasterCatchupFromWAL) { ASSERT_OK(CreateTable(cluster_.get(), kTableName)); - // Bring up the new master and add to the cluster. - master_hps.emplace_back(reserved_hp_); - NO_FATALS(StartNewMaster(master_hps, reserved_hp_, orig_num_masters_)); - NO_FATALS(VerifyVoterMasters(orig_num_masters_)); - ASSERT_OK(AddMasterToClusterUsingCLITool(reserved_hp_, nullptr, 4)); - - // Newly added master will be caught up from WAL itself without requiring tablet copy - // since the system catalog is fresh with a single table. - // Catching up from WAL and promotion to VOTER will not be instant after adding the - // new master. Hence using ASSERT_EVENTUALLY. - ASSERT_EVENTUALLY([&] { - VerifyVoterMasters(orig_num_masters_ + 1); - }); - - // Double check by directly contacting the new master. - NO_FATALS(VerifyNewMasterDirectly( - { RaftPeerPB::FOLLOWER, RaftPeerPB::LEADER }, RaftPeerPB::VOTER)); + // Add new master to the cluster. + { + string err; + EnvVars env_vars; + ASSERT_OK(BuildMasterOpts(orig_num_masters_, reserved_hp_, &new_master_opts_, &env_vars)); + ASSERT_OK(AddMasterToClusterUsingCLITool(new_master_opts_, &err, std::move(env_vars), + 4 /* wait_secs */, tools::GetKuduToolAbsolutePath())); + ASSERT_STR_CONTAINS(err, Substitute("Master $0 successfully caught up from WAL.", + new_master_opts_.rpc_bind_address.ToString())); + } // Adding the same master again should print a message but not throw an error. { string err; - ASSERT_OK(AddMasterToClusterUsingCLITool(reserved_hp_, &err)); + ExternalDaemonOptions opts; + EnvVars env_vars; + ASSERT_OK(BuildMasterOpts(orig_num_masters_, reserved_hp_, &opts, &env_vars)); + ASSERT_OK(AddMasterToClusterUsingCLITool(opts, &err, std::move(env_vars))); ASSERT_STR_CONTAINS(err, "Master already present"); } - // Adding one of the former masters should print a message but not throw an error. + // Adding one of the running former masters will throw an error. { string err; - ASSERT_OK(AddMasterToClusterUsingCLITool(master_hps[0], &err)); - ASSERT_STR_CONTAINS(err, "Master already present"); + const auto& hp = master_hps[0]; + ExternalDaemonOptions opts; + EnvVars env_vars; + ASSERT_OK(BuildMasterOpts(0, hp, &opts, &env_vars)); + Status s = AddMasterToClusterUsingCLITool(opts, &err, std::move(env_vars)); + ASSERT_TRUE(s.IsRuntimeError()) << s.ToString(); + ASSERT_STR_CONTAINS(err, Substitute("Master $0 already running", hp.ToString())); } + master_hps.emplace_back(reserved_hp_); NO_FATALS(VerifyClusterAfterMasterAddition(master_hps, orig_num_masters_ + 1)); } @@ -837,52 +814,46 @@ TEST_P(ParameterizedAddMasterTest, TestAddMasterSysCatalogCopy) { NO_FATALS(StartClusterWithSysCatalogGCed(&master_hps)); ASSERT_OK(CreateTable(cluster_.get(), kTableName)); - // Bring up the new master and add to the cluster. - master_hps.emplace_back(reserved_hp_); - NO_FATALS(StartNewMaster(master_hps, reserved_hp_, orig_num_masters_)); - NO_FATALS(VerifyVoterMasters(orig_num_masters_)); - string err; - ASSERT_OK(AddMasterToClusterUsingCLITool(reserved_hp_, &err)); - ASSERT_STR_MATCHES(err, Substitute("New master $0 part of the Raft configuration.*" - "Please follow the next steps which includes system catalog " - "tablet copy", reserved_hp_.ToString())); - // Newly added master will be added to the master Raft config but won't be caught up - // from the WAL and hence remain as a NON_VOTER and transition to FAILED_UNRECOVERABLE state. - NO_FATALS(VerifyNewNonVoterMaster(reserved_hp_, orig_num_masters_ + 1)); + // Add new master to the cluster. + { + string err; + EnvVars env_vars; + ASSERT_OK(BuildMasterOpts(orig_num_masters_, reserved_hp_, &new_master_opts_, &env_vars)); + ASSERT_OK(AddMasterToClusterUsingCLITool(new_master_opts_, &err, std::move(env_vars), + 4 /* wait_secs */)); + ASSERT_STR_CONTAINS(err, Substitute("Master $0 could not be caught up from WAL.", + reserved_hp_.ToString())); + ASSERT_STR_CONTAINS(err, "Successfully copied system catalog and new master is healthy"); + } // Adding the same master again should print a message but not throw an error. { string err; - ASSERT_OK(AddMasterToClusterUsingCLITool(reserved_hp_, &err)); + ExternalDaemonOptions opts; + EnvVars env_vars; + ASSERT_OK(BuildMasterOpts(orig_num_masters_, reserved_hp_, &opts, &env_vars)); + ASSERT_OK(AddMasterToClusterUsingCLITool(opts, &err, std::move(env_vars))); ASSERT_STR_CONTAINS(err, "Master already present"); } - // Adding one of the former masters should print a message but not throw an error. + // Adding one of the running former masters will throw an error. { string err; - ASSERT_OK(AddMasterToClusterUsingCLITool(master_hps[0], &err)); - ASSERT_STR_CONTAINS(err, "Master already present"); + const auto& hp = master_hps[0]; + ExternalDaemonOptions opts; + EnvVars env_vars; + ASSERT_OK(BuildMasterOpts(0, hp, &opts, &env_vars)); + Status s = AddMasterToClusterUsingCLITool(opts, &err, std::move(env_vars)); + ASSERT_TRUE(s.IsRuntimeError()) << s.ToString(); + ASSERT_STR_CONTAINS(err, Substitute("Master $0 already running", hp.ToString())); } - // Without system catalog copy, the new master will remain in the FAILED_UNRECOVERABLE state. - // So lets proceed with the tablet copy process for system catalog. - NO_FATALS(CopySystemCatalog(cluster_->master(0)->bound_rpc_hostport())); - - // Wait for the new master to be up and running and the leader master to send status only Raft - // message to allow the new master to be considered caught up and promoted to be being a VOTER. - ASSERT_EVENTUALLY([&] { - VerifyVoterMasters(orig_num_masters_ + 1); - }); - - // Verify the same state from the new master directly. - // Double check by directly contacting the new master. - NO_FATALS(VerifyNewMasterDirectly( - { RaftPeerPB::FOLLOWER, RaftPeerPB::LEADER }, RaftPeerPB::VOTER)); - + master_hps.emplace_back(reserved_hp_); NO_FATALS(VerifyClusterAfterMasterAddition(master_hps, orig_num_masters_ + 1)); } + class ParameterizedRemoveMasterTest : public DynamicMultiMasterTest, public ::testing::WithParamInterface> { public: @@ -1000,6 +971,7 @@ TEST_P(ParameterizedRemoveMasterTest, TestRemoveMaster) { NO_FATALS(mcv.CheckRowCount(kTableName, ClusterVerifier::EXACTLY, 0)); } + class ParameterizedRecoverMasterTest : public DynamicMultiMasterTest, public ::testing::WithParamInterface { public: @@ -1034,23 +1006,16 @@ TEST_P(ParameterizedRecoverMasterTest, TestRecoverDeadMasterCatchupfromWAL) { HostPort src_master_hp; NO_FATALS(FailAndRemoveFollowerMaster(master_hps, &master_to_recover_idx, &master_to_recover_hp, &src_master_hp)); - - // Add new master at the same HostPort - NO_FATALS(StartNewMaster(master_hps, master_to_recover_hp, master_to_recover_idx)); NO_FATALS(VerifyVoterMasters(orig_num_masters_ - 1)); - ASSERT_OK(AddMasterToClusterUsingCLITool(master_to_recover_hp)); - - // Newly added master will be caught up from WAL itself without requiring tablet copy - // since the system catalog is fresh with a single table. - // Catching up from WAL and promotion to VOTER will not be instant after adding the - // new master. Hence using ASSERT_EVENTUALLY. - ASSERT_EVENTUALLY([&] { - VerifyVoterMasters(orig_num_masters_); - }); - // Double check by directly contacting the new master. - NO_FATALS(VerifyNewMasterDirectly( - { RaftPeerPB::FOLLOWER, RaftPeerPB::LEADER }, RaftPeerPB::VOTER)); + // Add new master at the same HostPort + { + string err; + ASSERT_OK(BuildMasterOpts(master_to_recover_idx, master_to_recover_hp, &new_master_opts_)); + ASSERT_OK(AddMasterToClusterUsingCLITool(new_master_opts_, &err)); + ASSERT_STR_CONTAINS(err, Substitute("Master $0 successfully caught up from WAL.", + new_master_opts_.rpc_bind_address.ToString())); + } NO_FATALS(VerifyClusterAfterMasterAddition(master_hps, orig_num_masters_)); } @@ -1078,30 +1043,15 @@ TEST_P(ParameterizedRecoverMasterTest, TestRecoverDeadMasterSysCatalogCopy) { HostPort src_master_hp; NO_FATALS(FailAndRemoveFollowerMaster(master_hps, &master_to_recover_idx, &master_to_recover_hp, &src_master_hp)); + NO_FATALS(VerifyVoterMasters(orig_num_masters_ - 1)); // Add new master at the same HostPort - NO_FATALS(StartNewMaster(master_hps, master_to_recover_hp, master_to_recover_idx)); - NO_FATALS(VerifyVoterMasters(orig_num_masters_ - 1)); string err; - ASSERT_OK(AddMasterToClusterUsingCLITool(master_to_recover_hp, &err)); - ASSERT_STR_MATCHES(err, Substitute("New master $0 part of the Raft configuration.*" - "Please follow the next steps which includes system catalog " - "tablet copy", master_to_recover_hp.ToString())); - - // Verify the new master will remain as NON_VOTER and transition to FAILED_UNRECOVERABLE state. - NO_FATALS(VerifyNewNonVoterMaster(master_to_recover_hp, orig_num_masters_)); - - // Without system catalog copy, the new master will remain in the FAILED_UNRECOVERABLE state. - // So lets proceed with the tablet copy process for system catalog. - NO_FATALS(CopySystemCatalog(src_master_hp)); - - // Wait for the new master to be up and running and the leader master to send status only Raft - // message to allow the new master to be considered caught up and promoted to be being a VOTER. - ASSERT_EVENTUALLY([&] { - VerifyVoterMasters(orig_num_masters_); - }); - - VerifyNewMasterDirectly({ RaftPeerPB::FOLLOWER, RaftPeerPB::LEADER }, RaftPeerPB::VOTER); + ASSERT_OK(BuildMasterOpts(master_to_recover_idx, master_to_recover_hp, &new_master_opts_)); + ASSERT_OK(AddMasterToClusterUsingCLITool(new_master_opts_, &err)); + ASSERT_STR_CONTAINS(err, Substitute("Master $0 could not be caught up from WAL.", + master_to_recover_hp.ToString())); + ASSERT_STR_CONTAINS(err, "Successfully copied system catalog and new master is healthy"); NO_FATALS(VerifyClusterAfterMasterAddition(master_hps, orig_num_masters_)); } @@ -1111,21 +1061,16 @@ TEST_P(ParameterizedRecoverMasterTest, TestRecoverDeadMasterSysCatalogCopy) { // expected to fail due to invalid Raft config. TEST_F(DynamicMultiMasterTest, TestAddMasterWithNoLastKnownAddr) { NO_FATALS(SetUpWithNumMasters(1)); - NO_FATALS( - StartCluster({"--master_support_change_config"}, false /* supply_single_master_addr */)); - - // Verify that existing masters are running as VOTERs and collect their addresses to be used - // for starting the new master. - vector master_hps; - NO_FATALS(VerifyVoterMasters(orig_num_masters_, &master_hps)); + NO_FATALS(StartCluster({"--master_support_change_config"}, false/* supply_single_master_addr */)); - // Bring up the new master and add to the cluster. - master_hps.emplace_back(reserved_hp_); - NO_FATALS(StartNewMaster(master_hps, reserved_hp_, orig_num_masters_)); + // Verify that existing masters are running as VOTERs. NO_FATALS(VerifyVoterMasters(orig_num_masters_)); + // Add master to the cluster. string err; - Status actual = AddMasterToClusterUsingCLITool(reserved_hp_, &err); + ExternalDaemonOptions opts; + ASSERT_OK(BuildMasterOpts(orig_num_masters_, reserved_hp_, &opts)); + Status actual = AddMasterToClusterUsingCLITool(opts, &err); ASSERT_TRUE(actual.IsRuntimeError()) << actual.ToString(); ASSERT_STR_MATCHES(err, "'last_known_addr' field in single master Raft configuration not set. " "Please restart master with --master_addresses flag"); @@ -1140,19 +1085,14 @@ TEST_F(DynamicMultiMasterTest, TestAddMasterFeatureFlagNotSpecified) { NO_FATALS(SetUpWithNumMasters(1)); NO_FATALS(StartCluster({ "--master_support_change_config=false" })); - // Verify that existing masters are running as VOTERs and collect their addresses to be used - // for starting the new master. - vector master_hps; - NO_FATALS(VerifyVoterMasters(orig_num_masters_, &master_hps)); - - // Bring up the new master and add to the cluster. - master_hps.emplace_back(reserved_hp_); - NO_FATALS(StartNewMaster(master_hps, reserved_hp_, orig_num_masters_, - false /* master_supports_change_config */)); + // Verify that existing masters are running as VOTERs. NO_FATALS(VerifyVoterMasters(orig_num_masters_)); + // Add master to the cluster. string err; - Status actual = AddMasterToClusterUsingCLITool(reserved_hp_, &err); + ExternalDaemonOptions opts; + ASSERT_OK(BuildMasterOpts(orig_num_masters_, reserved_hp_, &opts)); + Status actual = AddMasterToClusterUsingCLITool(opts, &err); ASSERT_TRUE(actual.IsRuntimeError()) << actual.ToString(); ASSERT_STR_MATCHES(err, "Cluster does not support AddMaster"); @@ -1208,7 +1148,8 @@ TEST_F(DynamicMultiMasterTest, TestAddMasterToNonLeader) { // Bring up the new master and add to the cluster. master_hps.emplace_back(reserved_hp_); - NO_FATALS(StartNewMaster(master_hps, reserved_hp_, orig_num_masters_)); + scoped_refptr master; + NO_FATALS(StartNewMaster(master_hps, reserved_hp_, orig_num_masters_, &master)); NO_FATALS(VerifyVoterMasters(orig_num_masters_)); // Verify sending add master request to a non-leader master returns NOT_THE_LEADER error. @@ -1275,20 +1216,15 @@ TEST_F(DynamicMultiMasterTest, TestAddMasterMissingAndIncorrectAddress) { NO_FATALS(SetUpWithNumMasters(1)); NO_FATALS(StartCluster({"--master_support_change_config"})); - // Verify that existing masters are running as VOTERs and collect their addresses to be used - // for starting the new master. - vector master_hps; - NO_FATALS(VerifyVoterMasters(orig_num_masters_, &master_hps)); - - // Bring up the new master and add to the cluster. - master_hps.emplace_back(reserved_hp_); - NO_FATALS(StartNewMaster(master_hps, reserved_hp_, orig_num_masters_)); + // Verify that existing masters are running as VOTERs. NO_FATALS(VerifyVoterMasters(orig_num_masters_)); // Empty HostPort { string err; - Status actual = AddMasterToClusterUsingCLITool(HostPort(), &err); + ExternalDaemonOptions opts; + ASSERT_OK(BuildMasterOpts(orig_num_masters_, HostPort(), &opts)); + Status actual = AddMasterToClusterUsingCLITool(opts, &err); ASSERT_TRUE(actual.IsRuntimeError()) << actual.ToString(); ASSERT_STR_CONTAINS(err, "must provide positional argument master_address"); } @@ -1296,15 +1232,16 @@ TEST_F(DynamicMultiMasterTest, TestAddMasterMissingAndIncorrectAddress) { // Non-routable incorrect hostname. { string err; - Status actual = AddMasterToClusterUsingCLITool( - HostPort("non-existent-path.local", Master::kDefaultPort), &err); + ExternalDaemonOptions opts; + ASSERT_OK(BuildMasterOpts(orig_num_masters_, + HostPort("non-existent-path.local", Master::kDefaultPort), &opts)); + Status actual = AddMasterToClusterUsingCLITool(opts, &err); ASSERT_TRUE(actual.IsRuntimeError()) << actual.ToString(); - ASSERT_STR_CONTAINS(err, - "Network error: unable to resolve address for non-existent-path.local"); + ASSERT_STR_CONTAINS(err, "unable to resolve address for non-existent-path.local"); } // Verify no change in number of masters. - NO_FATALS(VerifyVoterMasters(orig_num_masters_, &master_hps)); + NO_FATALS(VerifyVoterMasters(orig_num_masters_)); } // Test that attempts to remove a master with missing master address and a non-existent @@ -1369,6 +1306,27 @@ TEST_F(DynamicMultiMasterTest, TestRemoveMasterMismatchHostnameAndUuid) { NO_FATALS(VerifyVoterMasters(orig_num_masters_, &master_hps)); } +// Test that attempts to add a master with non-existent kudu executable path. +TEST_F(DynamicMultiMasterTest, TestAddMasterIncorrectKuduBinary) { + NO_FATALS(SetUpWithNumMasters(1)); + NO_FATALS(StartCluster({ "--master_support_change_config" })); + + // Verify that existing masters are running as VOTERs. + NO_FATALS(VerifyVoterMasters(orig_num_masters_)); + + // Add master to the cluster. + string err; + string kudu_abs_path = "/tmp/path/to/nowhere"; + ExternalDaemonOptions opts; + ASSERT_OK(BuildMasterOpts(orig_num_masters_, reserved_hp_, &opts)); + Status actual = AddMasterToClusterUsingCLITool(opts, &err, {} /* env_vars */, 4, kudu_abs_path); + ASSERT_TRUE(actual.IsRuntimeError()) << actual.ToString(); + ASSERT_STR_CONTAINS(err, Substitute("kudu binary not found at $0", kudu_abs_path)); + + // Verify no change in number of masters. + NO_FATALS(VerifyVoterMasters(orig_num_masters_)); +} + // Test that attempts removing a leader master itself from a cluster with // 1 or 2 masters. class ParameterizedRemoveLeaderMasterTest : public DynamicMultiMasterTest, diff --git a/src/kudu/mini-cluster/external_mini_cluster.cc b/src/kudu/mini-cluster/external_mini_cluster.cc index bca138b5bd..74e56a585e 100644 --- a/src/kudu/mini-cluster/external_mini_cluster.cc +++ b/src/kudu/mini-cluster/external_mini_cluster.cc @@ -90,6 +90,7 @@ using kudu::tserver::ListTabletsResponsePB; using kudu::tserver::TabletServerAdminServiceProxy; using kudu::tserver::TabletServerServiceProxy; using std::copy; +using std::map; using std::pair; using std::string; using std::unique_ptr; @@ -949,20 +950,6 @@ string ExternalMiniCluster::UuidForTS(int ts_idx) const { return tablet_server(ts_idx)->uuid(); } -Status ExternalMiniCluster::AddMaster(const scoped_refptr& new_master) { - const auto& new_master_uuid = new_master->instance_id().permanent_uuid(); - for (const auto& m : masters_) { - if (m->instance_id().permanent_uuid() == new_master_uuid) { - CHECK(m->bound_rpc_hostport() == new_master->bound_rpc_hostport()); - return Status::AlreadyPresent(Substitute( - "Master $0, uuid: $1 already present in ExternalMiniCluster", - m->bound_rpc_hostport().ToString(), new_master_uuid)); - } - } - masters_.emplace_back(new_master); - return Status::OK(); -} - Status ExternalMiniCluster::RemoveMaster(const HostPort& hp) { for (auto it = masters_.begin(); it != masters_.end(); ++it) { if ((*it)->bound_rpc_hostport() == hp) { @@ -1165,25 +1152,35 @@ void ExternalDaemon::SetMetastoreIntegration(const string& hms_uris, opts_.extra_flags.emplace_back(Substitute("--hive_metastore_sasl_enabled=$0", enable_kerberos)); } -Status ExternalDaemon::EnableKerberos(MiniKdc* kdc, - const string& principal, - const string& bind_host) { - string spn = principal + "/" + bind_host; +Status ExternalDaemon::CreateKerberosConfig(MiniKdc* kdc, + const string& principal_base, + const string& bind_host, + vector* flags, + map* env_vars) { + string spn = principal_base + "/" + bind_host; string ktpath; RETURN_NOT_OK_PREPEND(kdc->CreateServiceKeytab(spn, &ktpath), "could not create keytab"); - extra_env_ = kdc->GetEnvVars(); + *env_vars = kdc->GetEnvVars(); + *flags = { + Substitute("--keytab_file=$0", ktpath), + Substitute("--principal=$0", spn), + "--rpc_authentication=required", + "--superuser_acl=test-admin", + "--user_acl=test-user", + }; + return Status::OK(); +} + +Status ExternalDaemon::EnableKerberos(MiniKdc* kdc, const string& principal_base, + const string& bind_host) { + vector flags; + RETURN_NOT_OK(CreateKerberosConfig(kdc, principal_base, bind_host, &flags, &extra_env_)); // Insert Kerberos flags at the front of extra_flags, so that user specified // flags will override them. - opts_.extra_flags.insert(opts_.extra_flags.begin(), { - Substitute("--keytab_file=$0", ktpath), - Substitute("--principal=$0", spn), - "--rpc_authentication=required", - "--superuser_acl=test-admin", - "--user_acl=test-user", - }); - + opts_.extra_flags.insert(opts_.extra_flags.begin(), std::make_move_iterator(flags.begin()), + std::make_move_iterator(flags.end())); return Status::OK(); } diff --git a/src/kudu/mini-cluster/external_mini_cluster.h b/src/kudu/mini-cluster/external_mini_cluster.h index 0823384c71..f4bd5fca3d 100644 --- a/src/kudu/mini-cluster/external_mini_cluster.h +++ b/src/kudu/mini-cluster/external_mini_cluster.h @@ -480,10 +480,6 @@ class ExternalMiniCluster : public MiniCluster { // files that reside in the log dir. std::string GetLogPath(const std::string& daemon_id) const; - // Adds a master to the ExternalMiniCluster when the new master has been added - // dynamically after bringing up the ExternalMiniCluster. - Status AddMaster(const scoped_refptr& master); - // Removes any bookkeeping of the master specified by 'hp' from the ExternalMiniCluster // after already having run through a successful master Raft change config to remove it. // This helps keep the state of the actual cluster in sync with the state in ExternalMiniCluster. @@ -566,6 +562,15 @@ class ExternalDaemon : public RefCountedThreadSafe { void SetMetastoreIntegration(const std::string& hms_uris, bool enable_kerberos); + // Create a Kerberos principal and keytab using the 'principal_base' and 'bind_host' hostname + // of the form /. + // Returns the generated keytab file and service principal as 'flags' and the appropriate + // environment variables in 'env_vars' output parameters. + static Status CreateKerberosConfig(MiniKdc* kdc, const std::string& principal_base, + const std::string& bind_host, + std::vector* flags, + std::map* env_vars); + // Enable Kerberos for this daemon. This creates a Kerberos principal // and keytab, and sets the appropriate environment variables in the // subprocess such that the server will use Kerberos authentication. diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc index 326a4f42cc..c4755114bf 100644 --- a/src/kudu/tools/kudu-tool-test.cc +++ b/src/kudu/tools/kudu-tool-test.cc @@ -54,11 +54,11 @@ #include "kudu/common/common.pb.h" #include "kudu/common/partial_row.h" #include "kudu/common/partition.h" +#include "kudu/common/row_operations.pb.h" #include "kudu/common/schema.h" #include "kudu/common/types.h" #include "kudu/common/wire_protocol-test-util.h" #include "kudu/common/wire_protocol.h" -#include "kudu/common/wire_protocol.pb.h" #include "kudu/consensus/consensus.pb.h" #include "kudu/consensus/log.h" #include "kudu/consensus/log_util.h" @@ -1107,8 +1107,8 @@ TEST_F(ToolTest, TestModeHelp) { "status.*Get the status", "timestamp.*Get the current timestamp", "list.*List masters in a Kudu cluster", - "add.*Add a master to the Raft configuration", - "remove.*Remove a master from the Raft configuration" + "add.*Add a master to the Kudu cluster", + "remove.*Remove a master from the Kudu cluster" }; NO_FATALS(RunTestHelp(kCmd, kMasterModeRegexes)); NO_FATALS(RunTestHelpRpcFlags(kCmd, @@ -1130,9 +1130,10 @@ TEST_F(ToolTest, TestModeHelp) { } { NO_FATALS(RunTestHelp("master add --help", - {"-wait_secs \\(Timeout in seconds to wait for the newly added master"})); + {"-wait_secs.*\\(Timeout in seconds to wait while retrying operations", + "-kudu_abs_path.*\\(Absolute file path of the 'kudu' executable used"})); NO_FATALS(RunTestHelp("master remove --help", - {"-master_uuid \\(Permanent UUID of the master"})); + {"-master_uuid.*\\(Permanent UUID of the master"})); } { const vector kPbcModeRegexes = { diff --git a/src/kudu/tools/tool_action_common.cc b/src/kudu/tools/tool_action_common.cc index 626b737d09..4d9fdfdb98 100644 --- a/src/kudu/tools/tool_action_common.cc +++ b/src/kudu/tools/tool_action_common.cc @@ -767,6 +767,20 @@ Status DumpMemTrackers(const string& address, uint16_t default_port) { return Status::OK(); } +Status GetKuduToolAbsolutePathSafe(string* path) { + static const char* const kKuduCtlFileName = "kudu"; + string exe; + RETURN_NOT_OK(Env::Default()->GetExecutablePath(&exe)); + const string binroot = DirName(exe); + string tool_abs_path = JoinPathSegments(binroot, kKuduCtlFileName); + if (!Env::Default()->FileExists(tool_abs_path)) { + return Status::NotFound(Substitute( + "$0 binary not found at $1", kKuduCtlFileName, tool_abs_path)); + } + *path = std::move(tool_abs_path); + return Status::OK(); +} + namespace { // Pretty print a table using the psql format. For example: diff --git a/src/kudu/tools/tool_action_common.h b/src/kudu/tools/tool_action_common.h index c97ea4e5c5..45f73301b5 100644 --- a/src/kudu/tools/tool_action_common.h +++ b/src/kudu/tools/tool_action_common.h @@ -216,6 +216,10 @@ Status ParseMasterAddresses( const RunnerContext& context, std::vector* master_addresses); +// Get full path to the top-level 'kudu' tool binary in the output 'path' parameter. +// Returns appropriate error if the binary is not found. +Status GetKuduToolAbsolutePathSafe(std::string* path); + // Parses a comma separated list of "host:port" pairs into an unordered set of // HostPort objects. If no port is specified for an entry in the comma separated list, // the default master port is used for that entry's pair. diff --git a/src/kudu/tools/tool_action_master.cc b/src/kudu/tools/tool_action_master.cc index 7826fa5a51..4b61e4f309 100644 --- a/src/kudu/tools/tool_action_master.cc +++ b/src/kudu/tools/tool_action_master.cc @@ -16,7 +16,9 @@ // under the License. #include +#include #include +#include // IWYU pragma: keep #include #include #include @@ -45,24 +47,37 @@ #include "kudu/master/master.pb.h" #include "kudu/master/master.proxy.h" #include "kudu/master/master_runner.h" +#include "kudu/master/sys_catalog.h" #include "kudu/rpc/response_callback.h" #include "kudu/rpc/rpc_controller.h" +#include "kudu/tools/ksck.h" +#include "kudu/tools/ksck_remote.h" #include "kudu/tools/tool_action.h" #include "kudu/tools/tool_action_common.h" +#include "kudu/util/env.h" +#include "kudu/util/flags.h" #include "kudu/util/init.h" #include "kudu/util/monotime.h" #include "kudu/util/net/net_util.h" +#include "kudu/util/scoped_cleanup.h" #include "kudu/util/status.h" #include "kudu/util/string_case.h" +#include "kudu/util/subprocess.h" DECLARE_bool(force); +DECLARE_int64(negotiation_timeout_ms); DECLARE_int64(timeout_ms); DECLARE_string(columns); +DECLARE_string(fs_wal_dir); +DECLARE_string(fs_data_dirs); DEFINE_string(master_uuid, "", "Permanent UUID of the master. Only needed to disambiguate in case " "of multiple masters with same RPC address"); DEFINE_int64(wait_secs, 8, - "Timeout in seconds to wait for the newly added master to be promoted as VOTER."); + "Timeout in seconds to wait while retrying operations like bringing up new master, " + "running ksck, waiting for the new master to be promoted as VOTER, etc."); +DEFINE_string(kudu_abs_path, "", "Absolute file path of the 'kudu' executable used to bring up " + "new master and other workflow steps."); using kudu::master::AddMasterRequestPB; using kudu::master::AddMasterResponsePB; @@ -127,49 +142,90 @@ Status MasterTimestamp(const RunnerContext& context) { return PrintServerTimestamp(address, Master::kDefaultPort); } -Status AddMasterChangeConfig(const RunnerContext& context) { - const string& new_master_address = FindOrDie(context.required_args, kMasterAddressArg); +// Bring up new master at the specified HostPort 'hp' with user-supplied 'flags' using kudu +// executable 'kudu_abs_path'. +// 'master_addresses' is list of masters in existing cluster including the new +// master. +Status BringUpNewMaster(const string& kudu_abs_path, + vector flags, + const HostPort& hp, + const vector& master_addresses, + unique_ptr* new_master_out) { + // Ensure the new master is not already running at the specified RPC address. + unique_ptr proxy; + RETURN_NOT_OK_PREPEND(BuildProxy(hp.ToString(), Master::kDefaultPort, &proxy), + "Failed building proxy for new master"); + + auto is_catalog_mngr_running = [](MasterServiceProxy* master_proxy) { + master::GetMasterRegistrationRequestPB req; + master::GetMasterRegistrationResponsePB resp; + RpcController rpc; + Status s = master_proxy->GetMasterRegistration(req, &resp, &rpc); + return s.ok() && !resp.has_error(); + }; - LeaderMasterProxy proxy; - RETURN_NOT_OK(proxy.Init(context)); + if (is_catalog_mngr_running(proxy.get())) { + return Status::IllegalState(Substitute("Master $0 already running", hp.ToString())); + } - HostPort hp; - RETURN_NOT_OK(hp.ParseString(new_master_address, Master::kDefaultPort)); - { - AddMasterRequestPB req; - AddMasterResponsePB resp; - *req.mutable_rpc_addr() = HostPortToPB(hp); - - Status s = proxy.SyncRpc( - req, &resp, "AddMaster", &MasterServiceProxy::AddMasterAsync, - {master::MasterFeatures::DYNAMIC_MULTI_MASTER}); - // It's possible this is a retry request in which case instead of returning - // the master is already present in the Raft config error we make further checks - // whether the master has been promoted to a VOTER. - bool master_already_present = - resp.has_error() && resp.error().code() == master::MasterErrorPB::MASTER_ALREADY_PRESENT; - if (!s.ok() && !master_already_present) { - return s; + flags.emplace_back("--master_addresses=" + JoinStrings(master_addresses, ",")); + flags.emplace_back("--master_address_add_new_master=" + hp.ToString()); + flags.emplace_back("--master_support_change_config"); + vector argv = { kudu_abs_path, "master", "run" }; + argv.insert(argv.end(), std::make_move_iterator(flags.begin()), + std::make_move_iterator(flags.end())); + auto new_master = std::make_unique(argv); + RETURN_NOT_OK_PREPEND(new_master->Start(), "Failed starting new master"); + auto stop_master = MakeScopedCleanup([&] { + WARN_NOT_OK(new_master->KillAndWait(SIGKILL), "Failed stopping new master"); + }); + MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(FLAGS_wait_secs); + do { + Status wait_status = new_master->WaitNoBlock(); + if (!wait_status.IsTimedOut()) { + return Status::RuntimeError("Failed to bring up new master"); } - if (master_already_present) { - LOG(INFO) << "Master already present. Checking for promotion to VOTER..."; + if (is_catalog_mngr_running(proxy.get())) { + stop_master.cancel(); + *new_master_out = std::move(new_master); + return Status::OK(); } - } + SleepFor(MonoDelta::FromMilliseconds(100)); + } while (MonoTime::Now() < deadline); - // If the system catalog of the new master can be caught up from the WAL then the new master will - // be promoted to a VOTER and become a FOLLOWER. However this can take some time, so we'll - // try for a few seconds. It's perfectly normal for the new master to be not caught up from - // the WAL in which case subsequent steps of system catalog tablet copy need to be carried out - // as outlined in the documentation for adding a new master to Kudu cluster. + return Status::TimedOut("Timed out waiting for the new master to come up"); +} + +// Check health and consensus status of masters in ksck. +Status CheckMastersHealthy(const vector& master_addresses) { + std::shared_ptr cluster; + RETURN_NOT_OK(RemoteKsckCluster::Build(master_addresses, &cluster)); + + // Print to an unopened ofstream to discard ksck output. + // See https://stackoverflow.com/questions/8243743. + std::ofstream null_stream; + Ksck ksck(cluster, &null_stream); + RETURN_NOT_OK(ksck.CheckMasterHealth()); + RETURN_NOT_OK(ksck.CheckMasterConsensus()); + return Status::OK(); +} + +// Check new master 'new_master_hp' is promoted to a VOTER and all masters are healthy. +// Returns the last Raft role and member_type in output parameters 'master_role' and 'master_type' +// respectively. +Status CheckMasterVoterAndHealthy(LeaderMasterProxy* proxy, + const vector& master_addresses, + const HostPort& new_master_hp, + RaftPeerPB::Role* master_role, + RaftPeerPB::MemberType* master_type) { + *master_role = RaftPeerPB::UNKNOWN_ROLE; + *master_type = RaftPeerPB::UNKNOWN_MEMBER_TYPE; MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(FLAGS_wait_secs); - RaftPeerPB::Role master_role = RaftPeerPB::UNKNOWN_ROLE; - RaftPeerPB::MemberType master_type = RaftPeerPB::UNKNOWN_MEMBER_TYPE; do { ListMastersRequestPB req; ListMastersResponsePB resp; - RETURN_NOT_OK((proxy.SyncRpc( - req, &resp, "ListMasters", &MasterServiceProxy::ListMastersAsync))); - + RETURN_NOT_OK((proxy->SyncRpc( + req, &resp, "ListMasters", &MasterServiceProxy::ListMastersAsync))); if (resp.has_error()) { return StatusFromPB(resp.error().status()); } @@ -184,7 +240,7 @@ Status AddMasterChangeConfig(const RunnerContext& context) { continue; } for (const auto& master_hp : master.registration().rpc_addresses()) { - if (hp == HostPortFromPB(master_hp)) { + if (new_master_hp == HostPortFromPB(master_hp)) { // Found the newly added master new_master_found = true; break; @@ -196,29 +252,212 @@ Status AddMasterChangeConfig(const RunnerContext& context) { } if (!new_master_found) { return Status::NotFound(Substitute("New master $0 not found. Retry adding the master", - hp.ToString())); + new_master_hp.ToString())); } CHECK_LT(i, resp.masters_size()); const auto& master = resp.masters(i); - master_role = master.role(); - master_type = master.member_type(); - if (master_type == RaftPeerPB::VOTER && - (master_role == RaftPeerPB::FOLLOWER || master_role == RaftPeerPB::LEADER)) { - LOG(INFO) << Substitute("Successfully added master $0 to the cluster. Please follow the " - "next steps which includes updating master addresses, updating " - "client configuration etc. from the Kudu administration " - "documentation on \"Migrating to Multiple Kudu Masters\".", - hp.ToString()); - return Status::OK(); + *master_role = master.role(); + *master_type = master.member_type(); + if (*master_type == RaftPeerPB::VOTER && + (*master_role == RaftPeerPB::FOLLOWER || *master_role == RaftPeerPB::LEADER)) { + // Check the master ksck state as well. + if (CheckMastersHealthy(master_addresses).ok()) { + return Status::OK(); + } } SleepFor(MonoDelta::FromMilliseconds(100)); } while (MonoTime::Now() < deadline); - LOG(INFO) << Substitute("New master $0 part of the Raft configuration; role: $1, member_type: " - "$2. Please follow the next steps which includes system catalog tablet " - "copy, updating master addresses etc. from the Kudu administration " - "documentation on \"Migrating to Multiple Kudu Masters\".", - hp.ToString(), master_role, master_type); + return Status::TimedOut("Timed out waiting for master to catch up from WAL"); +} + +// Deletes local system catalog on 'dst_master' and copies system catalog from one of the masters +// in 'master_addresses' which includes the dst_master using the kudu executable 'kudu_abs_path'. +Status CopyRemoteSystemCatalog(const string& kudu_abs_path, + const HostPort& dst_master, + const vector& master_addresses) { + // Find source master to copy system catalog from. + string src_master; + const auto& dst_master_str = dst_master.ToString(); + for (const auto& addr : master_addresses) { + if (addr != dst_master_str) { + src_master = addr; + break; + } + } + if (src_master.empty()) { + return Status::RuntimeError("Failed to find source master to copy system catalog"); + } + + LOG(INFO) << Substitute("Deleting system catalog on $0", dst_master_str); + RETURN_NOT_OK_PREPEND( + Subprocess::Call( + { kudu_abs_path, "local_replica", "delete", master::SysCatalogTable::kSysCatalogTabletId, + "--fs_wal_dir=" + FLAGS_fs_wal_dir, "--fs_data_dirs=" + FLAGS_fs_data_dirs, + "-clean_unsafe" }), + "Failed to delete system catalog"); + + LOG(INFO) << Substitute("Copying system catalog from master $0", src_master); + RETURN_NOT_OK_PREPEND( + Subprocess::Call( + { kudu_abs_path, "local_replica", "copy_from_remote", + master::SysCatalogTable::kSysCatalogTabletId, src_master, + "--fs_wal_dir=" + FLAGS_fs_wal_dir, "--fs_data_dirs=" + FLAGS_fs_data_dirs }), + "Failed to copy system catalog"); + + return Status::OK(); +} + +// Invoke the add master Raft change config RPC to add the supplied master 'hp'. +// If the master is already present then Status::OK is returned and a log message is printed. +Status AddMasterRaftConfig(const RunnerContext& context, + const HostPort& hp) { + LeaderMasterProxy proxy; + RETURN_NOT_OK(proxy.Init(context)); + AddMasterRequestPB req; + AddMasterResponsePB resp; + *req.mutable_rpc_addr() = HostPortToPB(hp); + + Status s = proxy.SyncRpc( + req, &resp, "AddMaster", &MasterServiceProxy::AddMasterAsync, + {master::MasterFeatures::DYNAMIC_MULTI_MASTER}); + // It's possible this is a retry request in which case instead of returning + // the master is already present in the Raft config error we make further checks + // whether the master has been promoted to a VOTER. + bool master_already_present = + resp.has_error() && resp.error().code() == master::MasterErrorPB::MASTER_ALREADY_PRESENT; + if (!s.ok() && !master_already_present) { + return s; + } + if (master_already_present) { + LOG(INFO) << "Master already present."; + } + LOG(INFO) << Substitute("Successfully added master $0 to the Raft configuration.", + hp.ToString()); + return Status::OK(); +} + +Status AddMaster(const RunnerContext& context) { + static const char* const kPostSuccessMsg = + "Please follow the next steps which includes updating master addresses, updating client " + "configuration etc. from the Kudu administration documentation on " + "\"Migrating to Multiple Kudu Masters\"."; + static const char* const kFailedStopMsg = + "Failed to stop new master after successfully adding to the cluster. Stop the master before " + "proceeding further."; + + // Parse input arguments. + vector master_addresses; + RETURN_NOT_OK(ParseMasterAddresses(context, &master_addresses)); + const string& new_master_address = FindOrDie(context.required_args, kMasterAddressArg); + HostPort hp; + RETURN_NOT_OK(hp.ParseString(new_master_address, Master::kDefaultPort)); + + // Check for the 'kudu' executable if the user has supplied one. + if (!FLAGS_kudu_abs_path.empty() && !Env::Default()->FileExists(FLAGS_kudu_abs_path)) { + return Status::NotFound(Substitute("kudu binary not found at $0", FLAGS_kudu_abs_path)); + } + string kudu_abs_path = FLAGS_kudu_abs_path; + if (kudu_abs_path.empty()) { + RETURN_NOT_OK(GetKuduToolAbsolutePathSafe(&kudu_abs_path)); + } + + // Get the flags that'll be needed to bring up new master and for system catalog copy. + if (FLAGS_fs_wal_dir.empty()) { + return Status::InvalidArgument("Flag -fs_wal_dir not supplied"); + } + if (FLAGS_fs_data_dirs.empty()) { + return Status::InvalidArgument("Flag -fs_data_dirs not supplied"); + } + GFlagsMap flags_map = GetFlagsMap(); + // Remove the optional parameters for this command. + // Remaining optional flags need to be passed to the new master. + flags_map.erase("wait_secs"); + flags_map.erase("kudu_abs_path"); + + // Bring up the new master. + vector new_master_flags; + new_master_flags.reserve(flags_map.size()); + for (const auto& name_flag_pair : flags_map) { + const auto& flag = name_flag_pair.second; + new_master_flags.emplace_back(Substitute("--$0=$1", flag.name, flag.current_value)); + } + + // Bring up the new master that includes master addresses of the cluster and itself. + // It's possible this is a retry in which case the new master is already part of + // master_addresses. + if (std::find(master_addresses.begin(), master_addresses.end(), hp.ToString()) == + master_addresses.end()) { + master_addresses.emplace_back(hp.ToString()); + } + + unique_ptr new_master; + RETURN_NOT_OK_PREPEND(BringUpNewMaster(kudu_abs_path, new_master_flags, hp, master_addresses, + &new_master), + "Failed bringing up new master. See Kudu Master log for details."); + auto stop_master = MakeScopedCleanup([&] { + auto* master_ptr = new_master.get(); + if (master_ptr != nullptr) { + WARN_NOT_OK(master_ptr->KillAndWait(SIGKILL), "Failed stopping new master"); + } + }); + // Call the add master Raft change config RPC. + RETURN_NOT_OK(AddMasterRaftConfig(context, hp)); + + // If the system catalog of the new master can be caught up from the WAL then the new master will + // transition to FAILED_UNRECOVERABLE state. However this can take some time, so we'll + // try for a few seconds. It's perfectly normal for the new master to be not caught up from + // the WAL in which case subsequent steps of system catalog tablet copy need to be carried out. + RaftPeerPB::Role master_role; + RaftPeerPB::MemberType master_type; + LeaderMasterProxy proxy; + // Since new master is now part of the Raft configuration, use updated 'master_addresses'. + RETURN_NOT_OK(proxy.Init( + master_addresses, MonoDelta::FromMilliseconds(FLAGS_timeout_ms), + MonoDelta::FromMilliseconds(FLAGS_negotiation_timeout_ms))); + + LOG(INFO) << "Checking for master consensus and health status..."; + Status wal_catchup_status = CheckMasterVoterAndHealthy(&proxy, master_addresses, hp, + &master_role, &master_type); + if (wal_catchup_status.ok()) { + stop_master.cancel(); + RETURN_NOT_OK_PREPEND(new_master->KillAndWait(SIGTERM), kFailedStopMsg); + LOG(INFO) << Substitute("Master $0 successfully caught up from WAL.", hp.ToString()); + LOG(INFO) << kPostSuccessMsg; + return Status::OK(); + } + if (!wal_catchup_status.IsTimedOut()) { + RETURN_NOT_OK_PREPEND(wal_catchup_status, + "Unexpected error waiting for master to catchup from WAL."); + return wal_catchup_status; + } + + // New master could not be caught up from WAL, so next we'll attempt copying + // system catalog. + LOG(INFO) << Substitute("Master $0 status; role: $1, member_type: $2", + hp.ToString(), RaftPeerPB::Role_Name(master_role), + RaftPeerPB::MemberType_Name(master_type)); + LOG(INFO) << Substitute("Master $0 could not be caught up from WAL.", hp.ToString()); + + RETURN_NOT_OK_PREPEND(new_master->KillAndWait(SIGTERM), + "Unable to stop master to proceed with system catalog copy"); + new_master.reset(); + + RETURN_NOT_OK(CopyRemoteSystemCatalog(kudu_abs_path, hp, master_addresses)); + + RETURN_NOT_OK_PREPEND(BringUpNewMaster(kudu_abs_path, new_master_flags, hp, master_addresses, + &new_master), + "Failed bringing up new master after system catalog copy. " + "See Kudu Master log for details."); + + RETURN_NOT_OK_PREPEND(CheckMasterVoterAndHealthy(&proxy, master_addresses, hp, &master_role, + &master_type), + "Failed waiting for new master to be healthy after system catalog copy"); + + stop_master.cancel(); + RETURN_NOT_OK_PREPEND(new_master->KillAndWait(SIGTERM), kFailedStopMsg); + LOG(INFO) << "Successfully copied system catalog and new master is healthy."; + LOG(INFO) << kPostSuccessMsg; return Status::OK(); } @@ -560,24 +799,44 @@ unique_ptr BuildMasterMode() { } { unique_ptr add_master = - ActionBuilder("add", &AddMasterChangeConfig) - .Description("Add a master to the Raft configuration of the Kudu cluster. " - "Please refer to the Kudu administration documentation on " - "\"Migrating to Multiple Kudu Masters\" for the complete steps.") + ActionBuilder("add", &AddMaster) + .Description("Add a master to the Kudu cluster") + .ExtraDescription( + "This is an advanced command that orchestrates the workflow to bring up and add a " + "new master to the Kudu cluster. It must be run locally on the new master being added " + "and not on the leader master. This tool shuts down the new master after completion " + "of the command regardless of whether the new master addition is successful. " + "After the command completes successfully, user is expected to explicitly " + "start the new master using the same flags as supplied to this tool.\n\n" + "Supply all the necessary flags to bring up the new master. " + "The most common configuration flags used to bring up a master are described " + "below. See \"Kudu Configuration Reference\" for all the configuration options that " + "apply to a Kudu master.\n\n" + "Please refer to the Kudu administration documentation on " + "\"Migrating to Multiple Kudu Masters\" for the complete steps.") .AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc }) .AddRequiredParameter({ kMasterAddressArg, kMasterAddressDesc }) .AddOptionalParameter("wait_secs") + .AddOptionalParameter("kudu_abs_path") + .AddOptionalParameter("fs_wal_dir") + .AddOptionalParameter("fs_data_dirs") + .AddOptionalParameter("fs_metadata_dir") + .AddOptionalParameter("log_dir") + // Unlike most tools we don't log to stderr by default to match the + // kudu-master binary as closely as possible. + .AddOptionalParameter("logtostderr", string("false")) .Build(); builder.AddAction(std::move(add_master)); } { unique_ptr remove_master = ActionBuilder("remove", &RemoveMasterChangeConfig) - .Description("Remove a master from the Raft configuration of the Kudu cluster. " - "Please refer to the Kudu administration documentation on " - "\"Removing Kudu Masters from a Multi-Master Deployment\" or " - "\"Recovering from a dead Kudu Master in a Multi-Master Deployment\" " - "as appropriate.") + .Description("Remove a master from the Kudu cluster") + .ExtraDescription( + "Removes a master from the Raft configuration of the Kudu cluster.\n\n" + "Please refer to the Kudu administration documentation on " + "\"Removing Kudu Masters from a Multi-Master Deployment\" or " + "\"Recovering from a dead Kudu Master in a Multi-Master Deployment\" as appropriate.") .AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc }) .AddRequiredParameter({ kMasterAddressArg, kMasterAddressDesc }) .AddOptionalParameter("master_uuid") diff --git a/src/kudu/tools/tool_test_util.cc b/src/kudu/tools/tool_test_util.cc index 22abba5c03..6d4f1179f8 100644 --- a/src/kudu/tools/tool_test_util.cc +++ b/src/kudu/tools/tool_test_util.cc @@ -20,18 +20,18 @@ #include "kudu/tools/tool_test_util.h" #include -#include +#include #include #include #include "kudu/gutil/strings/split.h" #include "kudu/gutil/strings/substitute.h" -#include "kudu/util/env.h" -#include "kudu/util/path_util.h" +#include "kudu/tools/tool_action_common.h" #include "kudu/util/status.h" #include "kudu/util/subprocess.h" +using std::map; using std::string; using std::vector; using strings::Split; @@ -41,18 +41,13 @@ namespace kudu { namespace tools { string GetKuduToolAbsolutePath() { - static const string kKuduCtlFileName = "kudu"; - string exe; - CHECK_OK(Env::Default()->GetExecutablePath(&exe)); - const string binroot = DirName(exe); - const string tool_abs_path = JoinPathSegments(binroot, kKuduCtlFileName); - CHECK(Env::Default()->FileExists(tool_abs_path)) - << kKuduCtlFileName << " binary not found at " << tool_abs_path; - return tool_abs_path; + string path; + CHECK_OK(GetKuduToolAbsolutePathSafe(&path)); + return path; } Status RunKuduTool(const vector& args, string* out, string* err, - const std::string& in) { + const string& in, map env_vars) { vector total_args = { GetKuduToolAbsolutePath() }; // Some scenarios might add unsafe flags for testing purposes. @@ -74,7 +69,7 @@ Status RunKuduTool(const vector& args, string* out, string* err, total_args.emplace_back("--openssl_security_level_override=0"); total_args.insert(total_args.end(), args.begin(), args.end()); - return Subprocess::Call(total_args, in, out, err); + return Subprocess::Call(total_args, in, out, err, std::move(env_vars)); } Status RunActionPrependStdoutStderr(const string& arg_str) { diff --git a/src/kudu/tools/tool_test_util.h b/src/kudu/tools/tool_test_util.h index b511c603b1..7aa41e3839 100644 --- a/src/kudu/tools/tool_test_util.h +++ b/src/kudu/tools/tool_test_util.h @@ -19,6 +19,7 @@ #pragma once +#include #include #include @@ -39,10 +40,13 @@ std::string GetKuduToolAbsolutePath(); // // If 'out' or 'err' is set, the tool's stdout or stderr output will be // written to each respectively. +// Optionally allows a passed map of environment variables to be set +// on the kudu tool via 'env_vars'. Status RunKuduTool(const std::vector& args, std::string* out = nullptr, std::string* err = nullptr, - const std::string& in = ""); + const std::string& in = "", + std::map env_vars = {}); // Runs the 'kudu' tool binary with the given argument string, returning an // error prepended with stdout and stderr if the run was unsuccessful.