Skip to content

Commit

Permalink
[FLINK-3929] Added Keytab based Kerberos support to enable secure Fli…
Browse files Browse the repository at this point in the history
…nk cluster deployment(addresses HDHS, Kafka and ZK services)

FLINK-3929 Added MiniKDC support for Kafka, Zookeeper, RollingFS and Yarn integration test modules
  • Loading branch information
vijikarthi authored and mxm committed Sep 20, 2016
1 parent 303f6fe commit 25a622f
Show file tree
Hide file tree
Showing 55 changed files with 2,553 additions and 226 deletions.
87 changes: 87 additions & 0 deletions docs/internals/flink_security.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
---
title: "Flink Security"
# Top navigation
top-nav-group: internals
top-nav-pos: 10
top-nav-title: Flink Security
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

This document briefly describes how Flink security works in the context of various deployment mechanism (Standalone/Cluster vs YARN)
and the connectors that participates in Flink Job execution stage. This documentation can be helpful for both administrators and developers
who plans to run Flink on a secure environment.

## Objective

The primary goal of Flink security model is to enable secure data access for jobs within a cluster via connectors. In a production deployment scenario,
streaming jobs are understood to run for longer period of time (days/weeks/months) and the system must be able to authenticate against secure
data sources throughout the life of the job. The current implementation supports running Flink clusters (Job Manager/Task Manager/Jobs) under the
context of a Kerberos identity based on Keytab credential supplied during deployment time. Any jobs submitted will continue to run in the identity of the cluster.

## How Flink Security works
Flink deployment includes running Job Manager/ZooKeeper, Task Manager(s), Web UI and Job(s). Jobs (user code) can be submitted through web UI and/or CLI.
A Job program may use one or more connectors (Kafka, HDFS, Cassandra, Flume, Kinesis etc.,) and each connector may have a specific security
requirements (Kerberos, database based, SSL/TLS, custom etc.,). While satisfying the security requirements for all the connectors evolves over a period
of time, at this time of writing, the following connectors/services are tested for Kerberos/Keytab based security.

- Kafka (0.9)
- HDFS
- ZooKeeper

Hadoop uses the UserGroupInformation (UGI) class to manage security. UGI is a static implementation that takes care of handling Kerberos authentication. The Flink bootstrap implementation
(JM/TM/CLI) takes care of instantiating UGI with the appropriate security credentials to establish the necessary security context.

Services like Kafka and ZooKeeper use SASL/JAAS based authentication mechanism to authenticate against a Kerberos server. It expects JAAS configuration with a platform-specific login
module *name* to be provided. Managing per-connector configuration files will be an overhead and to overcome this requirement, a process-wide JAAS configuration object is
instantiated which serves standard ApplicationConfigurationEntry for the connectors that authenticates using SASL/JAAS mechanism.

It is important to understand that the Flink processes (JM/TM/UI/Jobs) itself uses UGI's doAS() implementation to run under a specific user context, i.e. if Hadoop security is enabled
then the Flink processes will be running under a secure user account or else it will run as the OS login user account who starts the Flink cluster.

## Security Configurations

Secure credentials can be supplied by adding below configuration elements to Flink configuration file:

- `security.keytab`: Absolute path to Kerberos keytab file that contains the user credentials/secret.

- `security.principal`: User principal name that the Flink cluster should run as.

The delegation token mechanism (*kinit cache*) is still supported for backward compatibility but enabling security using *keytab* configuration is the preferred and recommended approach.

## Standalone Mode:

Steps to run a secure Flink cluster in standalone/cluster mode:
- Add security configurations to Flink configuration file (on all cluster nodes)
- Make sure the Keytab file exist in the path as indicated in *security.keytab* configuration on all cluster nodes
- Deploy Flink cluster using cluster start/stop scripts or CLI

## Yarn Mode:

Steps to run secure Flink cluster in Yarn mode:
- Add security configurations to Flink configuration file (on the node from where cluster will be provisioned using Flink/Yarn CLI)
- Make sure the Keytab file exist in the path as indicated in *security.keytab* configuration
- Deploy Flink cluster using CLI

In Yarn mode, the user supplied keytab will be copied over to the Yarn containers (App Master/JM and TM) as the Yarn local resource file.
Security implementation details are based on <a href="https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md">Yarn security</a>

## Token Renewal

UGI and Kafka/ZK login module implementations takes care of auto-renewing the tickets upon reaching expiry and no further action is needed on the part of Flink.
27 changes: 25 additions & 2 deletions docs/setup/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,35 @@ These options are useful for debugging a Flink application for memory and garbag

### Kerberos

Flink supports Kerberos authentication of Hadoop services such as HDFS, YARN, or HBase.
Flink supports Kerberos authentication for the following services

+ Hadoop Components: such as HDFS, YARN, or HBase.
+ Kafka Connectors (version 0.9+)
+ Zookeeper Server/Client

Hadoop components relies on the UserGroupInformation (UGI) implementation to handle Kerberos authentication, whereas Kafka and Zookeeper services handles Kerberos authentication through SASL/JAAS implementation.

**Kerberos is only properly supported in Hadoop version 2.6.1 and above. All
other versions have critical bugs which might fail the Flink job
unexpectedly.**

**Ticket cache** and **Keytab** modes are supported for all above mentioned services.

> Ticket cache (Supported only to provide backward compatibility support. Keytab is the preferred approach for long running jobs)
While Hadoop uses Kerberos tickets to authenticate users with services initially, the authentication process continues differently afterwards. Instead of saving the ticket to authenticate on a later access, Hadoop creates its own security tokens (DelegationToken) that it passes around. These are authenticated to Kerberos periodically but are independent of the token renewal time. The tokens have a maximum life span identical to the Kerberos ticket maximum life span.

Please make sure to set the maximum ticket life span high long running jobs. The renewal time of the ticket, on the other hand, is not important because Hadoop abstracts this away using its own security tocken renewal system. Hadoop makes sure that tickets are renewed in time and you can be sure to be authenticated until the end of the ticket life time.
While using ticket cache mode, please make sure to set the maximum ticket life span high long running jobs.

If you are on YARN, then it is sufficient to authenticate the client with Kerberos. On a Flink standalone cluster you need to ensure that, initially, all nodes are authenticated with Kerberos using the `kinit` tool.

> Keytab (security principal and keytab can be configured through Flink configuration file)
- `security.keytab`: Path to Keytab file
- `security.principal`: Principal associated with the keytab

Kerberos ticket renewal is abstracted and automatically handled by the Hadoop/Kafka/ZK login modules and ensures that tickets are renewed in time and you can be sure to be authenticated until the end of the ticket life time.

For Kafka and ZK, process-wide JAAS config will be created using the provided security credentials and the Kerberos authentication will be handled by Kafka/ZK login handlers.

### Other

Expand Down Expand Up @@ -315,6 +332,12 @@ Previously this key was named `recovery.mode` and the default value was `standal

- `high-availability.job.delay`: (Default `akka.ask.timeout`) Defines the delay before persisted jobs are recovered in case of a master recovery situation. Previously this key was named `recovery.job.delay`.

### ZooKeeper-Security

- `zookeeper.sasl.disable`: (Default: `true`) Defines if SASL based authentication needs to be enabled or disabled. The configuration value can be set to "true" if ZooKeeper cluster is running in secure mode (Kerberos)

- `zookeeper.sasl.service-name`: (Default: `zookeeper`) If the ZooKeeper server is configured with a different service name (default:"zookeeper") then it can be supplied using this configuration. A mismatch in service name between client and server configuration will cause the authentication to fail.

## Environment

- `env.log.dir`: (Defaults to the `log` directory under Flink's home) Defines the directory where the Flink logs are saved. It has to be an absolute path.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.security.SecurityContext;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
Expand Down Expand Up @@ -161,6 +161,9 @@ public CliFrontend(String configDir) throws Exception {
"filesystem scheme from configuration.", e);
}

this.config.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDirectory.getAbsolutePath()
+ ".." + File.separator);

this.clientTimeout = AkkaUtils.getClientTimeout(config);
}

Expand Down Expand Up @@ -982,25 +985,7 @@ public int parseParameters(String[] args) {
// do action
switch (action) {
case ACTION_RUN:
// run() needs to run in a secured environment for the optimizer.
if (SecurityUtils.isSecurityEnabled()) {
String message = "Secure Hadoop environment setup detected. Running in secure context.";
LOG.info(message);

try {
return SecurityUtils.runSecured(new SecurityUtils.FlinkSecuredRunner<Integer>() {
@Override
public Integer run() throws Exception {
return CliFrontend.this.run(params);
}
});
}
catch (Exception e) {
return handleError(e);
}
} else {
return run(params);
}
return CliFrontend.this.run(params);
case ACTION_LIST:
return list(params);
case ACTION_INFO:
Expand Down Expand Up @@ -1037,12 +1022,19 @@ public Integer run() throws Exception {
/**
* Submits the job based on the arguments
*/
public static void main(String[] args) {
public static void main(final String[] args) {
EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);

try {
CliFrontend cli = new CliFrontend();
int retCode = cli.parseParameters(args);
final CliFrontend cli = new CliFrontend();
SecurityContext.install(new SecurityContext.SecurityConfiguration().setFlinkConfiguration(cli.config));
int retCode = SecurityContext.getInstalled()
.runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
@Override
public Integer run() {
return cli.parseParameters(args);
}
});
System.exit(retCode);
}
catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,12 @@ public final class ConfigConstants {
@PublicEvolving
public static final String HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS = "high-availability.zookeeper.client.max-retry-attempts";

@PublicEvolving
public static final String ZOOKEEPER_SASL_DISABLE = "zookeeper.sasl.disable";

@PublicEvolving
public static final String ZOOKEEPER_SASL_SERVICE_NAME = "zookeeper.sasl.service-name";

/** Deprecated in favour of {@link #HA_ZOOKEEPER_QUORUM_KEY}. */
@Deprecated
public static final String ZOOKEEPER_QUORUM_KEY = "recovery.zookeeper.quorum";
Expand Down Expand Up @@ -1233,6 +1239,9 @@ public final class ConfigConstants {
/** ZooKeeper default leader port. */
public static final int DEFAULT_ZOOKEEPER_LEADER_PORT = 3888;

/** Defaults for ZK client security **/
public static final boolean DEFAULT_ZOOKEEPER_SASL_DISABLE = true;

// ------------------------- Queryable state ------------------------------

/** Port to bind KvState server to. */
Expand Down Expand Up @@ -1279,6 +1288,19 @@ public final class ConfigConstants {
/** The environment variable name which contains the location of the lib folder */
public static final String ENV_FLINK_LIB_DIR = "FLINK_LIB_DIR";

// -------------------------------- Security -------------------------------

/**
* The config parameter defining security credentials required
* for securing Flink cluster.
*/

/** Keytab file key name to be used in flink configuration file */
public static final String SECURITY_KEYTAB_KEY = "security.keytab";

/** Kerberos security principal key name to be used in flink configuration file */
public static final String SECURITY_PRINCIPAL_KEY = "security.principal";


/**
* Not instantiable.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ private static String format(@Nullable String template, @Nullable Object... args

return builder.toString();
}

// ------------------------------------------------------------------------

/** Private constructor to prevent instantiation */
Expand Down
26 changes: 26 additions & 0 deletions flink-dist/src/main/flink-bin/conf/flink-jaas.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# We are using this file as an workaround for the Kafka and ZK SASL implementation
# since they explicitly look for java.security.auth.login.config property
# The file itself is not used by the application since the internal implementation
# uses a process-wide in-memory java security configuration object.
# Please do not edit/delete this file - See FLINK-3929
sample {
useKeyTab=false
useTicketCache=true;
};
25 changes: 25 additions & 0 deletions flink-dist/src/main/resources/flink-conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,31 @@ jobmanager.web.port: 8081
#
# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
#

# high-availability: zookeeper
# high-availability.zookeeper.quorum: localhost:2181
# high-availability.zookeeper.storageDir: hdfs:///flink/ha/

#==============================================================================
# Flink Cluster Security Configuration (optional configuration)
#==============================================================================

# Kerberos security for the connectors can be enabled by providing below configurations
# Security works in two modes - keytab/principal combination or using the Kerberos token cache
# If keytab and principal are not provided, token cache (manual kinit) will be used

#security.keytab: /path/to/kerberos/keytab
#security.principal: flink-user

#==============================================================================
# ZK Security Configuration (optional configuration)
#==============================================================================
# Below configurations are applicable if ZK quorum is configured for Kerberos security

# SASL authentication is disabled by default and can be enabled by changig the value to false
#
# zookeeper.sasl.disable: true

# Override below configuration to provide custom ZK service name if configured
#
# zookeeper.sasl.service-name: zookeeper
Original file line number Diff line number Diff line change
Expand Up @@ -582,10 +582,11 @@ public static Protos.TaskInfo.Builder createTaskManagerContext(
// build the launch command
boolean hasLogback = new File(workingDirectory, "logback.xml").exists();
boolean hasLog4j = new File(workingDirectory, "log4j.properties").exists();
boolean hasKrb5 = false;

String launchCommand = BootstrapTools.getTaskManagerShellCommand(
flinkConfig, tmParams.containeredParameters(), ".", ".",
hasLogback, hasLog4j, taskManagerMainClass);
hasLogback, hasLog4j, hasKrb5, taskManagerMainClass);
cmd.setValue(launchCommand);

// build the environment variables
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ public static String getTaskManagerShellCommand(
String logDirectory,
boolean hasLogback,
boolean hasLog4j,
boolean hasKrb5,
Class<?> mainClass) {

StringBuilder tmCommand = new StringBuilder("$JAVA_HOME/bin/java");
Expand All @@ -328,6 +329,12 @@ public static String getTaskManagerShellCommand(
tmCommand.append(" -Dlog4j.configuration=file:")
.append(configDirectory).append("/log4j.properties");
}

//applicable only for YarnMiniCluster secure test run
//krb5.conf file will be available as local resource in JM/TM container
if(hasKrb5) {
tmCommand.append(" -Djava.security.krb5.conf=krb5.conf");
}
}

tmCommand.append(' ').append(mainClass.getName());
Expand Down
Loading

0 comments on commit 25a622f

Please sign in to comment.