Skip to content

Commit

Permalink
[FLINK-28330][runtime][security] Remove old delegation token framewor…
Browse files Browse the repository at this point in the history
…k code
  • Loading branch information
gaborgsomogyi authored Nov 14, 2022
1 parent 8205dde commit 480e6ed
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 136 deletions.
117 changes: 0 additions & 117 deletions flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.security.token.DelegationTokenConverter;
import org.apache.flink.runtime.util.HadoopUtils;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.function.FunctionWithException;
Expand All @@ -35,7 +34,6 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
Expand All @@ -55,10 +53,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -198,119 +194,6 @@ private static LocalResource registerLocalResource(
resourceType);
}

public static void setTokensFor(
ContainerLaunchContext amContainer,
List<Path> paths,
Configuration conf,
boolean obtainingDelegationTokens)
throws IOException {
Credentials credentials = new Credentials();

if (obtainingDelegationTokens) {
LOG.info("Obtaining delegation tokens for HDFS and HBase.");
// for HDFS
TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf);
// for HBase
obtainTokenForHBase(credentials, conf);
} else {
LOG.info("Delegation token retrieval for HDFS and HBase is disabled.");
}

// for user
UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();

Collection<Token<? extends TokenIdentifier>> usrTok = currUsr.getTokens();
for (Token<? extends TokenIdentifier> token : usrTok) {
LOG.info("Adding user token " + token.getService() + " with " + token);
credentials.addToken(token.getService(), token);
}

ByteBuffer tokens = ByteBuffer.wrap(DelegationTokenConverter.serialize(credentials));
amContainer.setTokens(tokens);
}

/** Obtain Kerberos security token for HBase. */
private static void obtainTokenForHBase(Credentials credentials, Configuration conf)
throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
LOG.info("Attempting to obtain Kerberos security token for HBase");
try {
// ----
// Intended call: HBaseConfiguration.addHbaseResources(conf);
Class.forName("org.apache.hadoop.hbase.HBaseConfiguration")
.getMethod("addHbaseResources", Configuration.class)
.invoke(null, conf);
// ----

LOG.info("HBase security setting: {}", conf.get("hbase.security.authentication"));

if (!"kerberos".equals(conf.get("hbase.security.authentication"))) {
LOG.info("HBase has not been configured to use Kerberos.");
return;
}

Token<?> token;
try {
LOG.info("Obtaining Kerberos security token for HBase");
// ----
// Intended call: Token<AuthenticationTokenIdentifier> token =
// TokenUtil.obtainToken(conf);
token =
(Token<?>)
Class.forName(
"org.apache.hadoop.hbase.security.token.TokenUtil")
.getMethod("obtainToken", Configuration.class)
.invoke(null, conf);
// ----
} catch (NoSuchMethodException e) {
// for HBase 2

// ----
// Intended call: ConnectionFactory connectionFactory =
// ConnectionFactory.createConnection(conf);
Closeable connectionFactory =
(Closeable)
Class.forName(
"org.apache.hadoop.hbase.client.ConnectionFactory")
.getMethod("createConnection", Configuration.class)
.invoke(null, conf);
// ----
Class<?> connectionClass =
Class.forName("org.apache.hadoop.hbase.client.Connection");
// ----
// Intended call: Token<AuthenticationTokenIdentifier> token =
// TokenUtil.obtainToken(connectionFactory);
token =
(Token<?>)
Class.forName(
"org.apache.hadoop.hbase.security.token.TokenUtil")
.getMethod("obtainToken", connectionClass)
.invoke(null, connectionFactory);
// ----
if (null != connectionFactory) {
connectionFactory.close();
}
}

if (token == null) {
LOG.error("No Kerberos security token for HBase available");
return;
}

credentials.addToken(token.getService(), token);
LOG.info("Added HBase Kerberos security token to credentials.");
} catch (ClassNotFoundException
| NoSuchMethodException
| IllegalAccessException
| InvocationTargetException e) {
LOG.info(
"HBase is not available (not packaged with this application): {} : \"{}\".",
e.getClass().getSimpleName(),
e.getMessage());
}
}
}

/**
* Copied method from org.apache.hadoop.yarn.util.Apps. It was broken by YARN-1824 (2.4.0) and
* fixed for 2.4.1 by https://issues.apache.org/jira/browse/YARN-1931
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.flink.runtime.security.token.DelegationTokenConverter;
import org.apache.flink.runtime.security.token.DelegationTokenManager;
import org.apache.flink.runtime.security.token.KerberosDelegationTokenManager;
import org.apache.flink.runtime.security.token.KerberosLoginProvider;
import org.apache.flink.runtime.util.HadoopUtils;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FlinkException;
Expand Down Expand Up @@ -113,7 +114,6 @@
import java.net.URLDecoder;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -1150,26 +1150,14 @@ private ApplicationReport startAppMaster(
final ContainerLaunchContext amContainer =
setupApplicationMasterContainer(yarnClusterEntrypoint, hasKrb5, processSpec);

// New delegation token framework
if (configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN)) {
setTokensFor(amContainer);
}
// Old delegation token framework
if (UserGroupInformation.isSecurityEnabled()) {
LOG.info("Adding delegation token to the AM container.");
final List<Path> pathsToObtainToken = new ArrayList<>();
boolean fetchToken =
configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN);
if (fetchToken) {
List<Path> yarnAccessList =
ConfigUtils.decodeListFromConfig(
configuration,
SecurityOptions.KERBEROS_HADOOP_FILESYSTEMS_TO_ACCESS,
Path::new);
pathsToObtainToken.addAll(yarnAccessList);
pathsToObtainToken.addAll(fileUploader.getRemotePaths());
KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(configuration);
if (kerberosLoginProvider.isLoginPossible()) {
setTokensFor(amContainer);
} else {
LOG.info(
"Cannot use kerberos delegation token manager, no valid kerberos credentials provided.");
}
Utils.setTokensFor(amContainer, pathsToObtainToken, yarnConfiguration, fetchToken);
}

amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());
Expand Down

0 comments on commit 480e6ed

Please sign in to comment.