Skip to content

Commit

Permalink
hms-tool: lookup master addresses config from master
Browse files Browse the repository at this point in the history
The HMS check and fix tools currently use the master addresses
configuration provided on the command line to validate and fix metadata
in HMS table entries. This only works correctly if the administrator
inputs the master addresses exactly as its configured on the masters. If
the hostports are reordered or use a slightly different format which
resolves to the same addresses, then metadata will be flagged as stale
and rewritten unnecessarily. As an example, the administrator could pass
'localhost' as the master address if they ran it on the master node, but
this wouldn't be appropriate to write into the HMS.

This commit changes the handling so that the tools use the master
addresses returned from the ConnectToCluster mechanism already present
in the client, which should match the master addresses config of the
master exactly.

Change-Id: If170cebe6e5d7fa05fbd7faf18755aa57bdfeeec
Reviewed-on: http://gerrit.cloudera.org:8080/11083
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <[email protected]>
  • Loading branch information
danburkert committed Aug 1, 2018
1 parent f847726 commit b2c6428
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 22 deletions.
12 changes: 11 additions & 1 deletion src/kudu/client/client-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "kudu/client/master_rpc.h"
#include "kudu/client/meta_cache.h"
#include "kudu/client/schema.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/partition.h"
#include "kudu/common/schema.h"
#include "kudu/common/wire_protocol.h"
Expand All @@ -46,6 +47,7 @@
#include "kudu/master/master.h"
#include "kudu/master/master.pb.h"
#include "kudu/master/master.proxy.h"
#include "kudu/rpc/connection.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/request_tracker.h"
#include "kudu/rpc/rpc_controller.h"
Expand All @@ -60,7 +62,6 @@
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/thread_restrictions.h"
#include "kudu/rpc/connection.h"

using std::pair;
using std::set;
Expand Down Expand Up @@ -702,6 +703,10 @@ void KuduClient::Data::ConnectedToClusterCb(

if (status.ok()) {
leader_master_hostport_ = HostPort(leader_hostname, leader_addr.port());
master_hostports_.clear();
for (const auto& hostport : connect_response.master_addrs()) {
master_hostports_.emplace_back(HostPort(hostport.host(), hostport.port()));
}
master_proxy_.reset(new MasterServiceProxy(messenger_, leader_addr, leader_hostname));
master_proxy_->set_user_credentials(user_credentials_);
}
Expand Down Expand Up @@ -822,6 +827,11 @@ HostPort KuduClient::Data::leader_master_hostport() const {
return leader_master_hostport_;
}

const vector<HostPort>& KuduClient::Data::master_hostports() const {
std::lock_guard<simple_spinlock> l(leader_master_lock_);
return master_hostports_;
}

shared_ptr<master::MasterServiceProxy> KuduClient::Data::master_proxy() const {
std::lock_guard<simple_spinlock> l(leader_master_lock_);
return master_proxy_;
Expand Down
8 changes: 7 additions & 1 deletion src/kudu/client/client-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ class KuduClient::Data {

HostPort leader_master_hostport() const;

const std::vector<HostPort>& master_hostports() const;

uint64_t GetLatestObservedTimestamp() const;

void UpdateLatestObservedTimestamp(uint64_t timestamp);
Expand Down Expand Up @@ -254,6 +256,10 @@ class KuduClient::Data {
// ConnectToClusterAsync.
HostPort leader_master_hostport_;

// The master RPC host ports as configured on the most recently connected to
// leader master in ConnectedToClusterCb.
std::vector<HostPort> master_hostports_;

// Proxy to the leader master.
std::shared_ptr<master::MasterServiceProxy> master_proxy_;

Expand All @@ -266,7 +272,7 @@ class KuduClient::Data {
std::vector<StatusCallback> leader_master_callbacks_primary_creds_;

// Protects 'leader_master_rpc_{any,primary}_creds_',
// 'leader_master_hostport_', and 'master_proxy_'.
// 'leader_master_hostport_', 'master_hostports_', and 'master_proxy_'.
//
// See: KuduClient::Data::ConnectToClusterAsync for a more
// in-depth explanation of why this is needed and how it works.
Expand Down
24 changes: 13 additions & 11 deletions src/kudu/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,18 @@ class PartitionSchema;
class SecurityUnknownTskTest;

namespace client {
class KuduClient;
class KuduTableAlterer;
}

namespace tools {
class LeaderMasterProxy;
std::string GetMasterAddresses(const client::KuduClient&);
void SetAlterExternalCatalogs(client::KuduTableAlterer*, bool);
} // namespace tools

namespace client {

class KuduClient;
class KuduDelete;
class KuduInsert;
class KuduLoggingCallback;
Expand Down Expand Up @@ -538,26 +539,27 @@ class KUDU_EXPORT KuduClient : public sp::enable_shared_from_this<KuduClient> {
private:
class KUDU_NO_EXPORT Data;

friend class internal::Batcher;
friend class internal::GetTableSchemaRpc;
friend class internal::LookupRpc;
friend class internal::MetaCache;
friend class internal::RemoteTablet;
friend class internal::RemoteTabletServer;
friend class internal::WriteRpc;
friend class ConnectToClusterBaseTest;
friend class ClientTest;
friend class ConnectToClusterBaseTest;
friend class KuduClientBuilder;
friend class KuduPartitionerBuilder;
friend class KuduScanner;
friend class KuduScanToken;
friend class KuduScanTokenBuilder;
friend class KuduScanner;
friend class KuduSession;
friend class KuduTable;
friend class KuduTableAlterer;
friend class KuduTableCreator;
friend class ::kudu::SecurityUnknownTskTest;
friend class internal::Batcher;
friend class internal::GetTableSchemaRpc;
friend class internal::LookupRpc;
friend class internal::MetaCache;
friend class internal::RemoteTablet;
friend class internal::RemoteTabletServer;
friend class internal::WriteRpc;
friend class kudu::SecurityUnknownTskTest;
friend class tools::LeaderMasterProxy;
friend std::string tools::GetMasterAddresses(const client::KuduClient&);

FRIEND_TEST(kudu::ClientStressTest, TestUniqueClientIds);
FRIEND_TEST(ClientTest, TestGetSecurityInfoFromMaster);
Expand Down
4 changes: 4 additions & 0 deletions src/kudu/tools/tool_action_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,10 @@ void SetAlterExternalCatalogs(client::KuduTableAlterer* alterer, bool alter_exte
alterer->alter_external_catalogs(alter_external_catalogs);
}

string GetMasterAddresses(const client::KuduClient& client) {
return HostPort::ToCommaSeparatedString(client.data_->master_hostports());
}

Status PrintServerStatus(const string& address, uint16_t default_port) {
ServerStatusPB status;
RETURN_NOT_OK(GetServerStatus(address, default_port, &status));
Expand Down
3 changes: 3 additions & 0 deletions src/kudu/tools/tool_action_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ Status SetServerFlag(const std::string& address, uint16_t default_port,
// Set the non-public 'alter_external_catalogs' option on a KuduTableAlterer.
void SetAlterExternalCatalogs(client::KuduTableAlterer* alterer, bool alter_external_catalogs);

// Get the configured master addresses on the most recently connected to leader master.
std::string GetMasterAddresses(const client::KuduClient& client);

// A table of data to present to the user.
//
// Supports formatting based on the --format flag.
Expand Down
26 changes: 17 additions & 9 deletions src/kudu/tools/tool_action_hms.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,20 +102,27 @@ Status RenameTableInKuduCatalog(KuduClient* kudu_client,

Status Init(const RunnerContext& context,
shared_ptr<KuduClient>* kudu_client,
unique_ptr<HmsCatalog>* hms_catalog) {
unique_ptr<HmsCatalog>* hms_catalog,
string* master_addrs) {
const string& master_addrs_flag = FindOrDie(context.required_args, kMasterAddressesArg);
vector<string> master_addrs = Split(master_addrs_flag, ",");

// Create a Kudu Client.
RETURN_NOT_OK(KuduClientBuilder()
.default_rpc_timeout(MonoDelta::FromMilliseconds(FLAGS_timeout_ms))
.master_server_addrs(master_addrs)
.master_server_addrs(Split(master_addrs_flag, ","))
.Build(kudu_client));

// Get the configured master addresses from the leader master. It's critical
// that the check and fix tools use the exact same master address
// configuration that the masters do, otherwise the HMS table entries will
// disagree on the master addresses property.
*master_addrs = GetMasterAddresses(**kudu_client);

if (FLAGS_hive_metastore_uris.empty()) {
// Lookup the HMS URIs and SASL config from the master configuration.
vector<GetFlagsResponsePB_Flag> flags;
RETURN_NOT_OK(GetServerFlags(master_addrs[0], master::Master::kDefaultPort, false, {}, &flags));
RETURN_NOT_OK(GetServerFlags(vector<string>(Split(*master_addrs, ","))[0],
master::Master::kDefaultPort, false, {}, &flags));

auto hms_uris = std::find_if(flags.begin(), flags.end(),
[] (const GetFlagsResponsePB_Flag& flag) {
Expand Down Expand Up @@ -146,7 +153,8 @@ Status Init(const RunnerContext& context,
Status HmsDowngrade(const RunnerContext& context) {
shared_ptr<KuduClient> kudu_client;
unique_ptr<HmsCatalog> hms_catalog;
Init(context, &kudu_client, &hms_catalog);
string master_addrs;
Init(context, &kudu_client, &hms_catalog, &master_addrs);

// 1. Identify all Kudu tables in the HMS entries.
vector<hive::Table> hms_tables;
Expand Down Expand Up @@ -377,10 +385,10 @@ Status AnalyzeCatalogs(const string& master_addrs,
Status CheckHmsMetadata(const RunnerContext& context) {
// TODO(dan): check that the critical HMS configuration flags
// (--hive_metastore_uris, --hive_metastore_sasl_enabled) match on all masters.
const string& master_addrs = FindOrDie(context.required_args, kMasterAddressesArg);
shared_ptr<KuduClient> kudu_client;
unique_ptr<HmsCatalog> hms_catalog;
RETURN_NOT_OK(Init(context, &kudu_client, &hms_catalog));
string master_addrs;
RETURN_NOT_OK(Init(context, &kudu_client, &hms_catalog, &master_addrs));

CatalogReport report;
RETURN_NOT_OK(AnalyzeCatalogs(master_addrs, hms_catalog.get(), kudu_client.get(), &report));
Expand Down Expand Up @@ -467,10 +475,10 @@ string TableIdent(const KuduTable& table) {
// failing due to duplicate table being present are logged and execution
// continues.
Status FixHmsMetadata(const RunnerContext& context) {
const string& master_addrs = FindOrDie(context.required_args, kMasterAddressesArg);
shared_ptr<KuduClient> kudu_client;
unique_ptr<HmsCatalog> hms_catalog;
RETURN_NOT_OK(Init(context, &kudu_client, &hms_catalog));
string master_addrs;
RETURN_NOT_OK(Init(context, &kudu_client, &hms_catalog, &master_addrs));

CatalogReport report;
RETURN_NOT_OK(AnalyzeCatalogs(master_addrs, hms_catalog.get(), kudu_client.get(), &report));
Expand Down

0 comments on commit b2c6428

Please sign in to comment.