Skip to content

Commit

Permalink
HIVE-8310 : RetryingHMSHandler is not used when kerberos auth enabled…
Browse files Browse the repository at this point in the history
… (Thejas Nair, reviewed by Ashutosh Chauhan)

git-svn-id: https://svn.apache.org/repos/asf/hive/trunk@1629777 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Thejas Nair committed Oct 6, 2014
1 parent 2f9df52 commit 401bea6
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class TestDBTokenStore extends TestCase{
public void testDBTokenStore() throws TokenStoreException, MetaException, IOException {

DelegationTokenStore ts = new DBTokenStore();
ts.setStore(new HMSHandler("Test handler"));
ts.setStore(new HMSHandler("Test handler").getMS());
assertEquals(0, ts.getMasterKeys().length);
assertEquals(false,ts.removeMasterKey(-1));
try{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5489,13 +5489,20 @@ public boolean set_aggr_stats_for(SetPartitionsStatsRequest request)
}


public static IHMSHandler newHMSHandler(String name, HiveConf hiveConf) throws MetaException {
return newHMSHandler(name, hiveConf, false);
public static IHMSHandler newRetryingHMSHandler(IHMSHandler baseHandler, HiveConf hiveConf)
throws MetaException {
return newRetryingHMSHandler(baseHandler, hiveConf, false);
}

public static IHMSHandler newRetryingHMSHandler(IHMSHandler baseHandler, HiveConf hiveConf,
boolean local) throws MetaException {
return RetryingHMSHandler.getProxy(hiveConf, baseHandler, local);
}

public static IHMSHandler newHMSHandler(String name, HiveConf hiveConf, boolean local)
public static Iface newRetryingHMSHandler(String name, HiveConf conf, boolean local)
throws MetaException {
return RetryingHMSHandler.getProxy(hiveConf, name, local);
HMSHandler baseHandler = new HiveMetaStore.HMSHandler(name, conf, false);
return RetryingHMSHandler.getProxy(conf, baseHandler, local);
}

/**
Expand Down Expand Up @@ -5704,6 +5711,9 @@ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge,

TProcessor processor;
TTransportFactory transFactory;
HMSHandler baseHandler = new HiveMetaStore.HMSHandler("new db based metaserver", conf,
false);
IHMSHandler handler = newRetryingHMSHandler(baseHandler, conf);
if (useSasl) {
// we are in secure mode.
if (useFramedTransport) {
Expand All @@ -5713,17 +5723,14 @@ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge,
conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_KEYTAB_FILE),
conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL));
// start delegation token manager
HMSHandler hmsHandler = new HMSHandler("new db based metaserver", conf);
saslServer.startDelegationTokenSecretManager(conf, hmsHandler);
saslServer.startDelegationTokenSecretManager(conf, baseHandler.getMS());
transFactory = saslServer.createTransportFactory(
MetaStoreUtils.getMetaStoreSaslProperties(conf));
processor = saslServer.wrapProcessor(
new ThriftHiveMetastore.Processor<HMSHandler>(hmsHandler));
new ThriftHiveMetastore.Processor<IHMSHandler>(handler));
LOG.info("Starting DB backed MetaStore Server in Secure Mode");
} else {
// we are in unsecure mode.
IHMSHandler handler = newHMSHandler("new db based metaserver", conf);

if (conf.getBoolVar(ConfVars.METASTORE_EXECUTE_SET_UGI)) {
transFactory = useFramedTransport ?
new ChainedTTransportFactory(new TFramedTransport.Factory(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public HiveMetaStoreClient(HiveConf conf, HiveMetaHookLoader hookLoader)
if (localMetaStore) {
// instantiate the metastore server handler directly instead of connecting
// through the network
client = HiveMetaStore.newHMSHandler("hive client", conf, true);
client = HiveMetaStore.newRetryingHMSHandler("hive client", conf, true);
isConnected = true;
snapshotActiveConf();
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,25 +41,30 @@ static class MetaStoreInitData {
}

/**
* Updates the connection URL in hiveConf using the hook
*
* Updates the connection URL in hiveConf using the hook (if a hook has been
* set using hive.metastore.ds.connection.url.hook property)
* @param originalConf - original configuration used to look up hook settings
* @param activeConf - the configuration file in use for looking up db url
* @param badUrl
* @param updateData - hook information
* @return true if a new connection URL was loaded into the thread local
* configuration
* @throws MetaException
*/
static boolean updateConnectionURL(HiveConf hiveConf, Configuration conf,
static boolean updateConnectionURL(HiveConf originalConf, Configuration activeConf,
String badUrl, MetaStoreInitData updateData)
throws MetaException {
String connectUrl = null;
String currentUrl = MetaStoreInit.getConnectionURL(conf);
String currentUrl = MetaStoreInit.getConnectionURL(activeConf);
try {
// We always call init because the hook name in the configuration could
// have changed.
MetaStoreInit.initConnectionUrlHook(hiveConf, updateData);
MetaStoreInit.initConnectionUrlHook(originalConf, updateData);
if (updateData.urlHook != null) {
if (badUrl != null) {
updateData.urlHook.notifyBadConnectionUrl(badUrl);
}
connectUrl = updateData.urlHook.getJdoConnectionUrl(hiveConf);
connectUrl = updateData.urlHook.getJdoConnectionUrl(originalConf);
}
} catch (Exception e) {
LOG.error("Exception while getting connection URL from the hook: " +
Expand All @@ -71,7 +76,7 @@ static boolean updateConnectionURL(HiveConf hiveConf, Configuration conf,
String.format("Overriding %s with %s",
HiveConf.ConfVars.METASTORECONNECTURLKEY.toString(),
connectUrl));
conf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.toString(),
activeConf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.toString(),
connectUrl);
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,34 +41,34 @@ public class RetryingHMSHandler implements InvocationHandler {

private static final Log LOG = LogFactory.getLog(RetryingHMSHandler.class);

private final IHMSHandler base;
private final IHMSHandler baseHandler;
private final MetaStoreInit.MetaStoreInitData metaStoreInitData =
new MetaStoreInit.MetaStoreInitData();

private final HiveConf hiveConf; // base configuration
private final Configuration configuration; // active configuration
private final HiveConf origConf; // base configuration
private final Configuration activeConf; // active configuration

private RetryingHMSHandler(HiveConf hiveConf, String name, boolean local) throws MetaException {
this.hiveConf = hiveConf;
this.base = new HiveMetaStore.HMSHandler(name, hiveConf, false);
private RetryingHMSHandler(HiveConf hiveConf, IHMSHandler baseHandler, boolean local) throws MetaException {
this.origConf = hiveConf;
this.baseHandler = baseHandler;
if (local) {
base.setConf(hiveConf); // tests expect configuration changes applied directly to metastore
baseHandler.setConf(hiveConf); // tests expect configuration changes applied directly to metastore
}
configuration = base.getConf();
activeConf = baseHandler.getConf();

// This has to be called before initializing the instance of HMSHandler
// Using the hook on startup ensures that the hook always has priority
// over settings in *.xml. The thread local conf needs to be used because at this point
// it has already been initialized using hiveConf.
MetaStoreInit.updateConnectionURL(hiveConf, getConf(), null, metaStoreInitData);
MetaStoreInit.updateConnectionURL(hiveConf, getActiveConf(), null, metaStoreInitData);

base.init();
baseHandler.init();
}

public static IHMSHandler getProxy(HiveConf hiveConf, String name, boolean local)
public static IHMSHandler getProxy(HiveConf hiveConf, IHMSHandler baseHandler, boolean local)
throws MetaException {

RetryingHMSHandler handler = new RetryingHMSHandler(hiveConf, name, local);
RetryingHMSHandler handler = new RetryingHMSHandler(hiveConf, baseHandler, local);

return (IHMSHandler) Proxy.newProxyInstance(
RetryingHMSHandler.class.getClassLoader(),
Expand All @@ -79,15 +79,15 @@ public static IHMSHandler getProxy(HiveConf hiveConf, String name, boolean local
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {

boolean gotNewConnectUrl = false;
boolean reloadConf = HiveConf.getBoolVar(hiveConf,
boolean reloadConf = HiveConf.getBoolVar(origConf,
HiveConf.ConfVars.HMSHANDLERFORCERELOADCONF);
long retryInterval = HiveConf.getTimeVar(hiveConf,
long retryInterval = HiveConf.getTimeVar(origConf,
HiveConf.ConfVars.HMSHANDLERINTERVAL, TimeUnit.MILLISECONDS);
int retryLimit = HiveConf.getIntVar(hiveConf,
int retryLimit = HiveConf.getIntVar(origConf,
HiveConf.ConfVars.HMSHANDLERATTEMPTS);

if (reloadConf) {
MetaStoreInit.updateConnectionURL(hiveConf, getConf(),
MetaStoreInit.updateConnectionURL(origConf, getActiveConf(),
null, metaStoreInitData);
}

Expand All @@ -96,9 +96,9 @@ public Object invoke(final Object proxy, final Method method, final Object[] arg
while (true) {
try {
if (reloadConf || gotNewConnectUrl) {
base.setConf(getConf());
baseHandler.setConf(getActiveConf());
}
return method.invoke(base, args);
return method.invoke(baseHandler, args);

} catch (javax.jdo.JDOException e) {
caughtException = e;
Expand Down Expand Up @@ -158,13 +158,13 @@ public Object invoke(final Object proxy, final Method method, final Object[] arg
Thread.sleep(retryInterval);
// If we have a connection error, the JDO connection URL hook might
// provide us with a new URL to access the datastore.
String lastUrl = MetaStoreInit.getConnectionURL(getConf());
gotNewConnectUrl = MetaStoreInit.updateConnectionURL(hiveConf, getConf(),
String lastUrl = MetaStoreInit.getConnectionURL(getActiveConf());
gotNewConnectUrl = MetaStoreInit.updateConnectionURL(origConf, getActiveConf(),
lastUrl, metaStoreInitData);
}
}

public Configuration getConf() {
return configuration;
public Configuration getActiveConf() {
return activeConf;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,18 +108,17 @@ public List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers() throws
return delTokenIdents;
}

private Object hmsHandler;
private Object rawStore;

@Override
public void setStore(Object hms) throws TokenStoreException {
hmsHandler = hms;
public void setStore(Object rawStore) throws TokenStoreException {
this.rawStore = rawStore;
}

private Object invokeOnRawStore(String methName, Object[] params, Class<?> ... paramTypes)
throws TokenStoreException{

try {
Object rawStore = hmsHandler.getClass().getMethod("getMS").invoke(hmsHandler);
return rawStore.getClass().getMethod(methName, paramTypes).invoke(rawStore, params);
} catch (IllegalArgumentException e) {
throw new TokenStoreException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ protected DelegationTokenStore getTokenStore(Configuration conf)
}

@Override
public void startDelegationTokenSecretManager(Configuration conf, Object hms)
public void startDelegationTokenSecretManager(Configuration conf, Object rawStore)
throws IOException{
long secretKeyInterval =
conf.getLong(DELEGATION_KEY_UPDATE_INTERVAL_KEY,
Expand All @@ -430,7 +430,7 @@ public void startDelegationTokenSecretManager(Configuration conf, Object hms)
DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);

DelegationTokenStore dts = getTokenStore(conf);
dts.setStore(hms);
dts.setStore(rawStore);
secretManager = new TokenStoreDelegationTokenSecretManager(secretKeyInterval,
tokenMaxLifetime,
tokenRenewInterval,
Expand Down

0 comments on commit 401bea6

Please sign in to comment.