Skip to content

Commit

Permalink
[FLINK-6376] [yarn] Improve inline code comments related to HDFS dele…
Browse files Browse the repository at this point in the history
…gation token inclusion

This closes apache#3776.
  • Loading branch information
tzulitai committed Jul 1, 2017
1 parent b1f3408 commit e575c6c
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,19 @@ public void install(SecurityUtils.SecurityConfiguration securityConfig) throws S
// supplement with any available tokens
String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
if (fileLocation != null) {
/*
* Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are
* used in the context of reading the stored tokens from UGI.
* Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
* loginUser.addCredentials(cred);
* Notify:If UGI use the keytab for login, do not load HDFS delegation token.
*/
// Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are
// used in the context of reading the stored tokens from UGI.
// Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
// loginUser.addCredentials(cred);
try {
Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
File.class, org.apache.hadoop.conf.Configuration.class);
Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation),
securityConfig.getHadoopConfiguration());

// if UGI uses Kerberos keytabs for login, do not load HDFS delegation token since
// the UGI would prefer the delegation token instead, which eventually expires
// and does not fallback to using Kerberos tickets
Method getAllTokensMethod = Credentials.class.getMethod("getAllTokens");
Credentials credentials = new Credentials();
final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN");
Expand All @@ -83,6 +84,7 @@ public void install(SecurityUtils.SecurityConfiguration securityConfig) throws S
credentials.addToken(id, token);
}
}

Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials",
Credentials.class);
addCredentialsMethod.invoke(loginUser, credentials);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@ public ApplicationReport startAppMaster(JobGraph jobGraph, YarnClient yarnClient
final ContainerLaunchContext amContainer = setupApplicationMasterContainer(hasLogback, hasLog4j, hasKrb5);

if (UserGroupInformation.isSecurityEnabled()) {
//set tokens when security is enable
// set HDFS delegation tokens when security is enabled
LOG.info("Adding delegation token to the AM container..");
Utils.setTokensFor(amContainer, paths, conf);
}
Expand Down
13 changes: 6 additions & 7 deletions flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -457,14 +457,13 @@ static ContainerLaunchContext createTaskExecutorContext(

try (DataOutputBuffer dob = new DataOutputBuffer()) {
log.debug("Adding security tokens to Task Executor Container launch Context....");
/*
* For taskmanager yarn container context, read the tokens from the jobmanager yarn container local flie.
* Notify: must read the tokens from the local file, but not from UGI context.Because if UGI is login
* from Keytab, there is no HDFS degegation token in UGI context.
*/

// For TaskManager YARN container context, read the tokens from the jobmanager yarn container local flie.
// NOTE: must read the tokens from the local file, not from the UGI context, because if UGI is login
// using Kerberos keytabs, there is no HDFS delegation token in the UGI context.
String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
File.class, org.apache.hadoop.conf.Configuration.class);
Method readTokenStorageFileMethod = Credentials.class.getMethod(
"readTokenStorageFile", File.class, org.apache.hadoop.conf.Configuration.class);
Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation),
new SecurityUtils.SecurityConfiguration(flinkConfig).getHadoopConfiguration());
cred.writeTokenStorageToStream(dob);
Expand Down

0 comments on commit e575c6c

Please sign in to comment.