Skip to content

Commit

Permalink
[FLINK-28608][runtime][security]Make Hadoop FS token renewer configur…
Browse files Browse the repository at this point in the history
…able
  • Loading branch information
gaborgsomogyi authored Jul 21, 2022
1 parent 5f0c4ab commit 0c5108c
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public Optional<Long> obtainDelegationTokens(Credentials credentials) throws Exc
Clock clock = Clock.systemDefaultZone();
Set<FileSystem> fileSystemsToAccess = getFileSystemsToAccess();

obtainDelegationTokens(null, fileSystemsToAccess, credentials);
obtainDelegationTokens(getRenewer(), fileSystemsToAccess, credentials);

// Get the token renewal interval if it is not set. It will be called only once.
if (tokenRenewalInterval == null) {
Expand All @@ -88,6 +88,13 @@ public Optional<Long> obtainDelegationTokens(Credentials credentials) throws Exc
interval -> getTokenRenewalDate(clock, credentials, interval));
}

@VisibleForTesting
@Nullable
String getRenewer() {
return flinkConfiguration.getString(
String.format("security.kerberos.token.provider.%s.renewer", serviceName()), null);
}

private Set<FileSystem> getFileSystemsToAccess() throws IOException {
Set<FileSystem> result = new HashSet<>();

Expand Down Expand Up @@ -155,6 +162,7 @@ protected void obtainDelegationTokens(
});
}

@VisibleForTesting
Optional<Long> getTokenRenewalInterval(Clock clock, Set<FileSystem> fileSystemsToAccess)
throws IOException {
// We cannot use the tokens generated with renewer yarn
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import static java.time.Instant.ofEpochMilli;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

/** Test for {@link HadoopFSDelegationTokenProvider}. */
class HadoopFSDelegationTokenProviderITCase {
Expand Down Expand Up @@ -66,6 +67,24 @@ public long renew(Configuration conf) {
}
}

@Test
public void getRenewerShouldReturnNullByDefault() throws Exception {
HadoopFSDelegationTokenProvider provider = new HadoopFSDelegationTokenProvider();
provider.init(new org.apache.flink.configuration.Configuration());
assertNull(provider.getRenewer());
}

@Test
public void getRenewerShouldReturnConfiguredRenewer() throws Exception {
String renewer = "testRenewer";
HadoopFSDelegationTokenProvider provider = new HadoopFSDelegationTokenProvider();
org.apache.flink.configuration.Configuration configuration =
new org.apache.flink.configuration.Configuration();
configuration.setString("security.kerberos.token.provider.hadoopfs.renewer", renewer);
provider.init(configuration);
assertEquals(renewer, provider.getRenewer());
}

@Test
public void getTokenRenewalIntervalShouldReturnNoneWhenNoTokens() throws IOException {
HadoopFSDelegationTokenProvider provider =
Expand Down

0 comments on commit 0c5108c

Please sign in to comment.