Skip to content

Commit

Permalink
[FLINK-30339][runtime][security] Add a unified delegation token manager
Browse files Browse the repository at this point in the history
  • Loading branch information
gaborgsomogyi authored and mbalassi committed Dec 20, 2022
1 parent 6e5f869 commit 37ad343
Show file tree
Hide file tree
Showing 13 changed files with 254 additions and 254 deletions.
5 changes: 5 additions & 0 deletions docs/content.zh/docs/deployment/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ Flink's network connections can be secured via SSL. Please refer to the [SSL Set

{{< generated/security_ssl_section >}}

### Delegation token

Flink has a pluggable authentication protocol agnostic delegation token framework.

{{< generated/security_delegation_token_section >}}

### Auth with External Systems

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,6 @@
<td>List&lt;String&gt;</td>
<td>A comma-separated list of Kerberos-secured Hadoop filesystems Flink is going to access. For example, security.kerberos.access.hadoopFileSystems=hdfs://namenode2:9002,hdfs://namenode3:9003. The JobManager needs to have access to these filesystems to retrieve the security tokens.</td>
</tr>
<tr>
<td><h5>security.kerberos.fetch.delegation-token</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Indicates whether to fetch the delegation tokens for external services the Flink job needs to contact. Only HDFS and HBase are supported. It is used in Yarn deployments. If true, Flink will fetch HDFS and HBase delegation tokens and inject them into Yarn AM containers. If false, Flink will assume that the delegation tokens are managed outside of Flink. As a consequence, it will not fetch delegation tokens for HDFS and HBase. You may need to disable this option, if you rely on submission mechanisms, e.g. Apache Oozie, to handle delegation tokens.</td>
</tr>
<tr>
<td><h5>security.kerberos.login.contexts</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down Expand Up @@ -50,17 +44,5 @@
<td>Duration</td>
<td>The time period when keytab login happens automatically in order to always have a valid TGT.</td>
</tr>
<tr>
<td><h5>security.kerberos.tokens.renewal.retry.backoff</h5></td>
<td style="word-wrap: break-word;">1 h</td>
<td>Duration</td>
<td>The time period how long to wait before retrying to obtain new delegation tokens after a failure.</td>
</tr>
<tr>
<td><h5>security.kerberos.tokens.renewal.time-ratio</h5></td>
<td style="word-wrap: break-word;">0.75</td>
<td>Double</td>
<td>Ratio of the tokens's expiration time when new credentials should be re-obtained.</td>
</tr>
</tbody>
</table>
36 changes: 18 additions & 18 deletions docs/layouts/shortcodes/generated/security_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,30 @@
<td>List&lt;String&gt;</td>
<td>List of factories that should be used to instantiate a security context. If multiple are configured, Flink will use the first compatible factory. You should have a NoOpSecurityContextFactory in this list as a fallback.</td>
</tr>
<tr>
<td><h5>security.delegation.tokens.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Indicates whether to start delegation tokens system for external services.</td>
</tr>
<tr>
<td><h5>security.delegation.tokens.renewal.retry.backoff</h5></td>
<td style="word-wrap: break-word;">1 h</td>
<td>Duration</td>
<td>The time period how long to wait before retrying to obtain new delegation tokens after a failure.</td>
</tr>
<tr>
<td><h5>security.delegation.tokens.renewal.time-ratio</h5></td>
<td style="word-wrap: break-word;">0.75</td>
<td>Double</td>
<td>Ratio of the tokens's expiration time when new credentials should be re-obtained.</td>
</tr>
<tr>
<td><h5>security.kerberos.access.hadoopFileSystems</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>List&lt;String&gt;</td>
<td>A comma-separated list of Kerberos-secured Hadoop filesystems Flink is going to access. For example, security.kerberos.access.hadoopFileSystems=hdfs://namenode2:9002,hdfs://namenode3:9003. The JobManager needs to have access to these filesystems to retrieve the security tokens.</td>
</tr>
<tr>
<td><h5>security.kerberos.fetch.delegation-token</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Indicates whether to fetch the delegation tokens for external services the Flink job needs to contact. Only HDFS and HBase are supported. It is used in Yarn deployments. If true, Flink will fetch HDFS and HBase delegation tokens and inject them into Yarn AM containers. If false, Flink will assume that the delegation tokens are managed outside of Flink. As a consequence, it will not fetch delegation tokens for HDFS and HBase. You may need to disable this option, if you rely on submission mechanisms, e.g. Apache Oozie, to handle delegation tokens.</td>
</tr>
<tr>
<td><h5>security.kerberos.krb5-conf.path</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down Expand Up @@ -62,18 +74,6 @@
<td>Duration</td>
<td>The time period when keytab login happens automatically in order to always have a valid TGT.</td>
</tr>
<tr>
<td><h5>security.kerberos.tokens.renewal.retry.backoff</h5></td>
<td style="word-wrap: break-word;">1 h</td>
<td>Duration</td>
<td>The time period how long to wait before retrying to obtain new delegation tokens after a failure.</td>
</tr>
<tr>
<td><h5>security.kerberos.tokens.renewal.time-ratio</h5></td>
<td style="word-wrap: break-word;">0.75</td>
<td>Double</td>
<td>Ratio of the tokens's expiration time when new credentials should be re-obtained.</td>
</tr>
<tr>
<td><h5>security.module.factory.classes</h5></td>
<td style="word-wrap: break-word;">"org.apache.flink.runtime.security.modules.HadoopModuleFactory";<wbr>"org.apache.flink.runtime.security.modules.JaasModuleFactory";<wbr>"org.apache.flink.runtime.security.modules.ZookeeperModuleFactory"</td>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<table class="configuration table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Key</th>
<th class="text-left" style="width: 15%">Default</th>
<th class="text-left" style="width: 10%">Type</th>
<th class="text-left" style="width: 55%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>security.delegation.tokens.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Indicates whether to start delegation tokens system for external services.</td>
</tr>
<tr>
<td><h5>security.delegation.tokens.renewal.retry.backoff</h5></td>
<td style="word-wrap: break-word;">1 h</td>
<td>Duration</td>
<td>The time period how long to wait before retrying to obtain new delegation tokens after a failure.</td>
</tr>
<tr>
<td><h5>security.delegation.tokens.renewal.time-ratio</h5></td>
<td style="word-wrap: break-word;">0.75</td>
<td>Double</td>
<td>Ratio of the tokens's expiration time when new credentials should be re-obtained.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public static final class Sections {

public static final String SECURITY_SSL = "security_ssl";
public static final String SECURITY_AUTH_KERBEROS = "security_auth_kerberos";
public static final String SECURITY_DELEGATION_TOKEN = "security_delegation_token";
public static final String SECURITY_AUTH_ZOOKEEPER = "security_auth_zk";

public static final String STATE_BACKEND_ROCKSDB = "state_backend_rocksdb";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ public class SecurityOptions {
+ " (for example, `Client,KafkaClient` to use the credentials for ZooKeeper authentication and for"
+ " Kafka authentication)");

/** @deprecated Use {@link #DELEGATION_TOKENS_ENABLED}. */
@Deprecated
@Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS)
public static final ConfigOption<Boolean> KERBEROS_FETCH_DELEGATION_TOKEN =
key("security.kerberos.fetch.delegation-token")
Expand All @@ -132,6 +134,8 @@ public class SecurityOptions {
.withDescription(
"The time period when keytab login happens automatically in order to always have a valid TGT.");

/** @deprecated Use {@link #DELEGATION_TOKENS_RENEWAL_RETRY_BACKOFF}. */
@Deprecated
@Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS)
public static final ConfigOption<Duration> KERBEROS_TOKENS_RENEWAL_RETRY_BACKOFF =
key("security.kerberos.tokens.renewal.retry.backoff")
Expand All @@ -140,6 +144,8 @@ public class SecurityOptions {
.withDescription(
"The time period how long to wait before retrying to obtain new delegation tokens after a failure.");

/** @deprecated Use {@link #DELEGATION_TOKENS_RENEWAL_TIME_RATIO}. */
@Deprecated
@Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS)
public static final ConfigOption<Double> KERBEROS_TOKENS_RENEWAL_TIME_RATIO =
key("security.kerberos.tokens.renewal.time-ratio")
Expand All @@ -160,6 +166,37 @@ public class SecurityOptions {
+ "security.kerberos.access.hadoopFileSystems=hdfs://namenode2:9002,hdfs://namenode3:9003. "
+ "The JobManager needs to have access to these filesystems to retrieve the security tokens.");

// ------------------------------------------------------------------------
// Delegation Token Options
// ------------------------------------------------------------------------

@Documentation.Section(Documentation.Sections.SECURITY_DELEGATION_TOKEN)
public static final ConfigOption<Boolean> DELEGATION_TOKENS_ENABLED =
key("security.delegation.tokens.enabled")
.booleanType()
.defaultValue(true)
.withDeprecatedKeys(KERBEROS_FETCH_DELEGATION_TOKEN.key())
.withDescription(
"Indicates whether to start delegation tokens system for external services.");

@Documentation.Section(Documentation.Sections.SECURITY_DELEGATION_TOKEN)
public static final ConfigOption<Duration> DELEGATION_TOKENS_RENEWAL_RETRY_BACKOFF =
key("security.delegation.tokens.renewal.retry.backoff")
.durationType()
.defaultValue(Duration.ofHours(1))
.withDeprecatedKeys(KERBEROS_TOKENS_RENEWAL_RETRY_BACKOFF.key())
.withDescription(
"The time period how long to wait before retrying to obtain new delegation tokens after a failure.");

@Documentation.Section(Documentation.Sections.SECURITY_DELEGATION_TOKEN)
public static final ConfigOption<Double> DELEGATION_TOKENS_RENEWAL_TIME_RATIO =
key("security.delegation.tokens.renewal.time-ratio")
.doubleType()
.defaultValue(0.75)
.withDeprecatedKeys(KERBEROS_TOKENS_RENEWAL_TIME_RATIO.key())
.withDescription(
"Ratio of the tokens's expiration time when new credentials should be re-obtained.");

// ------------------------------------------------------------------------
// ZooKeeper Security Options
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.security.contexts.SecurityContext;
import org.apache.flink.runtime.security.token.DefaultDelegationTokenManagerFactory;
import org.apache.flink.runtime.security.token.DelegationTokenManager;
import org.apache.flink.runtime.security.token.hadoop.KerberosDelegationTokenManagerFactory;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.webmonitor.retriever.impl.RpcMetricQueryServiceRetriever;
import org.apache.flink.util.AutoCloseableAsync;
Expand Down Expand Up @@ -389,11 +389,8 @@ protected void initializeServices(Configuration configuration, PluginManager plu
configuration.setString(BlobServerOptions.PORT, String.valueOf(blobServer.getPort()));
heartbeatServices = createHeartbeatServices(configuration);
delegationTokenManager =
KerberosDelegationTokenManagerFactory.create(
getClass().getClassLoader(),
configuration,
commonRpcService.getScheduledExecutor(),
ioExecutor);
DefaultDelegationTokenManagerFactory.create(
configuration, commonRpcService.getScheduledExecutor(), ioExecutor);
metricRegistry = createMetricRegistry(configuration, pluginManager, rpcSystem);

final RpcService metricQueryServiceRpcService =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.security.token.DefaultDelegationTokenManagerFactory;
import org.apache.flink.runtime.security.token.DelegationTokenManager;
import org.apache.flink.runtime.security.token.hadoop.KerberosDelegationTokenManagerFactory;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
Expand Down Expand Up @@ -428,11 +428,8 @@ public void start() throws Exception {
heartbeatServices = HeartbeatServices.fromConfiguration(configuration);

delegationTokenManager =
KerberosDelegationTokenManagerFactory.create(
getClass().getClassLoader(),
configuration,
commonRpcService.getScheduledExecutor(),
ioExecutor);
DefaultDelegationTokenManagerFactory.create(
configuration, commonRpcService.getScheduledExecutor(), ioExecutor);

blobCacheService =
BlobUtils.createBlobCacheService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@
* limitations under the License.
*/

package org.apache.flink.runtime.security.token.hadoop;
package org.apache.flink.runtime.security.token;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.security.token.DelegationTokenContainer;
import org.apache.flink.runtime.security.token.DelegationTokenManager;
import org.apache.flink.runtime.security.token.DelegationTokenProvider;
import org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenUpdater;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.concurrent.ScheduledExecutor;
Expand All @@ -44,23 +42,23 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static org.apache.flink.configuration.SecurityOptions.KERBEROS_TOKENS_RENEWAL_RETRY_BACKOFF;
import static org.apache.flink.configuration.SecurityOptions.KERBEROS_TOKENS_RENEWAL_TIME_RATIO;
import static org.apache.flink.configuration.SecurityOptions.DELEGATION_TOKENS_RENEWAL_RETRY_BACKOFF;
import static org.apache.flink.configuration.SecurityOptions.DELEGATION_TOKENS_RENEWAL_TIME_RATIO;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
* Manager for delegation tokens in a Flink cluster.
*
* <p>When delegation token renewal is enabled, this manager will make sure long-running apps can
* run without interruption while accessing secured services. It periodically logs in to the KDC
* with user-provided credentials, and contacts all the configured secure services to obtain
* delegation tokens to be distributed to the rest of the application.
* run without interruption while accessing secured services. It periodically contacts all the
* configured secure services to obtain delegation tokens to be distributed to the rest of the
* application.
*/
@Internal
public class KerberosDelegationTokenManager implements DelegationTokenManager {
public class DefaultDelegationTokenManager implements DelegationTokenManager {

private static final Logger LOG = LoggerFactory.getLogger(KerberosDelegationTokenManager.class);
private static final Logger LOG = LoggerFactory.getLogger(DefaultDelegationTokenManager.class);

private final Configuration configuration;

Expand All @@ -82,14 +80,14 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager {

@Nullable private Listener listener;

public KerberosDelegationTokenManager(
public DefaultDelegationTokenManager(
Configuration configuration,
@Nullable ScheduledExecutor scheduledExecutor,
@Nullable ExecutorService ioExecutor) {
this.configuration = checkNotNull(configuration, "Flink configuration must not be null");
this.tokensRenewalTimeRatio = configuration.get(KERBEROS_TOKENS_RENEWAL_TIME_RATIO);
this.tokensRenewalTimeRatio = configuration.get(DELEGATION_TOKENS_RENEWAL_TIME_RATIO);
this.renewalRetryBackoffPeriod =
configuration.get(KERBEROS_TOKENS_RENEWAL_RETRY_BACKOFF).toMillis();
configuration.get(DELEGATION_TOKENS_RENEWAL_RETRY_BACKOFF).toMillis();
this.delegationTokenProviders = loadProviders();
this.scheduledExecutor = scheduledExecutor;
this.ioExecutor = ioExecutor;
Expand Down Expand Up @@ -120,13 +118,10 @@ private Map<String, DelegationTokenProvider> loadProviders() {
provider.serviceName());
}
} catch (Exception | NoClassDefFoundError e) {
LOG.error(
LOG.warn(
"Failed to initialize delegation token provider {}",
provider.serviceName(),
e);
if (!(e instanceof NoClassDefFoundError)) {
throw new FlinkRuntimeException(e);
}
}
}

Expand All @@ -138,7 +133,7 @@ private Map<String, DelegationTokenProvider> loadProviders() {
@VisibleForTesting
boolean isProviderEnabled(String serviceName) {
return configuration.getBoolean(
String.format("security.kerberos.token.provider.%s.enabled", serviceName), true);
String.format("security.delegation.token.provider.%s.enabled", serviceName), true);
}

@VisibleForTesting
Expand Down
Loading

0 comments on commit 37ad343

Please sign in to comment.