Skip to content

Commit

Permalink
[FLINK-17480][kubernetes][python] Support running PyFlink on Kubernetes
Browse files Browse the repository at this point in the history
This closes apache#13322.
  • Loading branch information
shuiqiangchen authored and dianfu committed Sep 30, 2020
1 parent d999126 commit 7050c04
Show file tree
Hide file tree
Showing 15 changed files with 486 additions and 21 deletions.
95 changes: 92 additions & 3 deletions docs/ops/deployment/native_kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,12 @@ Please follow our [configuration guide]({{ site.baseurl }}/ops/config.html) if y

If you do not specify a particular name for your session by `kubernetes.cluster-id`, the Flink client will generate a UUID name.

<span class="label label-info">Note</span> A docker image with Python and PyFlink installed is required if you are going to start a session cluster for Python Flink Jobs.
Please refer to the following [section](#custom-flink-docker-image).

### Custom Flink Docker image
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">

If you want to use a custom Docker image to deploy Flink containers, check [the Flink Docker image documentation](docker.html),
[its tags](docker.html#image-tags), [how to customize the Flink Docker image](docker.html#customize-flink-image) and [enable plugins](docker.html#using-plugins).
Expand All @@ -91,14 +96,61 @@ $ ./bin/kubernetes-session.sh \
-Dresourcemanager.taskmanager-timeout=3600000 \
-Dkubernetes.container.image=<CustomImageName>
{% endhighlight %}
</div>

<div data-lang="python" markdown="1">
To build a custom image which has Python and Pyflink prepared, you can refer to the following Dockerfile:
{% highlight Dockerfile %}
FROM flink

# install python3 and pip3
RUN apt-get update -y && \
apt-get install -y python3.7 python3-pip python3.7-dev && rm -rf /var/lib/apt/lists/*
RUN ln -s /usr/bin/python3 /usr/bin/python

# install Python Flink
RUN pip3 install apache-flink
{% endhighlight %}

Build the image named as **pyflink:latest**:

{% highlight bash %}
sudo docker build -t pyflink:latest .
{% endhighlight %}

Then you are able to start a PyFlink session cluster by setting the [`kubernetes.container.image`](../config.html#kubernetes-container-image)
configuration option value to be the name of custom image:

{% highlight bash %}
$ ./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=<ClusterId> \
-Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.taskmanager.cpu=2 \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dresourcemanager.taskmanager-timeout=3600000 \
-Dkubernetes.container.image=pyflink:latest
{% endhighlight %}
</div>

</div>

### Submitting jobs to an existing Session

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
Use the following command to submit a Flink Job to the Kubernetes cluster.

{% highlight bash %}
$ ./bin/flink run -d -t kubernetes-session -Dkubernetes.cluster-id=<ClusterId> examples/streaming/WindowJoin.jar
{% endhighlight %}
</div>

<div data-lang="python" markdown="1">
Use the following command to submit a PyFlink Job to the Kubernetes cluster.
{% highlight bash %}
$ ./bin/flink run -d -t kubernetes-session -Dkubernetes.cluster-id=<ClusterId> -pym scala_function -pyfs examples/python/table/udf
{% endhighlight %}
</div>
</div>

### Accessing Job Manager UI

Expand Down Expand Up @@ -155,9 +207,9 @@ $ kubectl delete deployment/<ClusterID>
## Flink Kubernetes Application

### Start Flink Application

<div class="codetabs" markdown="1">
Application mode allows users to create a single image containing their Job and the Flink runtime, which will automatically create and destroy cluster components as needed. The Flink community provides base docker images [customized](docker.html#customize-flink-image) for any use case.

<div data-lang="java" markdown="1">
{% highlight dockerfile %}
FROM flink
RUN mkdir -p $FLINK_HOME/usrlib
Expand All @@ -174,7 +226,44 @@ $ ./bin/flink run-application -p 8 -t kubernetes-application \
-Dkubernetes.container.image=<CustomImageName> \
local:///opt/flink/usrlib/my-flink-job.jar
{% endhighlight %}
</div>

<div data-lang="python" markdown="1">
{% highlight dockerfile %}
FROM flink

# install python3 and pip3
RUN apt-get update -y && \
apt-get install -y python3.7 python3-pip python3.7-dev && rm -rf /var/lib/apt/lists/*
RUN ln -s /usr/bin/python3 /usr/bin/python

# install Python Flink
RUN pip3 install apache-flink
COPY /path/of/python/codes /opt/python_codes

# if there are third party python dependencies, users can install them when building the image
COPY /path/to/requirements.txt /opt/requirements.txt
RUN pip3 install -r requirements.txt

# if the job requires external java dependencies, they should be built into the image as well
RUN mkdir -p $FLINK_HOME/usrlib
COPY /path/of/external/jar/dependencies $FLINK_HOME/usrlib/
{% endhighlight %}

Use the following command to start a PyFlink application, assuming the application image name is **my-pyflink-app:latest**.
{% highlight bash %}
$ ./bin/flink run-application -p 8 -t kubernetes-application \
-Dkubernetes.cluster-id=<ClusterId> \
-Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.taskmanager.cpu=2 \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dkubernetes.container.image=my-pyflink-app:latest \
-pym <ENTRY_MODULE_NAME> (or -py /opt/python_codes/<ENTRY_FILE_NAME>) -pyfs /opt/python_codes
{% endhighlight %}
You are able to specify the python main entry script path with `-py` or main entry module name with `-pym`, the path
of the python codes in the image with `-pyfs` and some other options.
</div>
</div>
Note: Only "local" is supported as schema for application mode. This assumes that the jar is located in the image, not the Flink client.

Note: All the jars in the "$FLINK_HOME/usrlib" directory in the image will be added to user classpath.
Expand Down
91 changes: 91 additions & 0 deletions docs/ops/deployment/native_kubernetes.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,12 @@ $ ./bin/kubernetes-session.sh \

如果你未通过 `kubernetes.cluster-id` 为 session 指定特定名称,Flink 客户端将会生成一个 UUID 名称。

<span class="label label-info">注意</span> 如果要启动 session 集群运行 PyFlink 作业, 你需要提供一个安装有 Python 和 PyFlink 的镜像。
请参考下面的[章节](#custom-flink-docker-image).

### 自定义 Flink Docker 镜像
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">

如果要使用自定义的 Docker 镜像部署 Flink 容器,请查看 [Flink Docker 镜像文档](docker.html)[镜像 tags](docker.html#image-tags)[如何自定义 Flink Docker 镜像](docker.html#customize-flink-image)[启用插件](docker.html#using-plugins)
如果创建了自定义的 Docker 镜像,则可以通过设置 [`kubernetes.container.image`](../config.html#kubernetes-container-image) 配置项来指定它:
Expand All @@ -88,14 +93,60 @@ $ ./bin/kubernetes-session.sh \
-Dresourcemanager.taskmanager-timeout=3600000 \
-Dkubernetes.container.image=<CustomImageName>
{% endhighlight %}
</div>


<div data-lang="python" markdown="1">
请参考下面的 Dockerfile 构建一个安装了 Python 和 PyFlink 的 docker 镜像:
{% highlight Dockerfile %}
FROM flink

# 安装 python3 和 pip3
RUN apt-get update -y && \
apt-get install -y python3.7 python3-pip python3.7-dev && rm -rf /var/lib/apt/lists/*
RUN ln -s /usr/bin/python3 /usr/bin/python

# 安装 Python Flink
RUN pip3 install apache-flink
{% endhighlight %}

构建镜像,命名为**pyflink:latest**:
{% highlight bash %}
sudo docker build -t pyflink:latest .
{% endhighlight %}
接下来将下面的命令行 [`kubernetes.container.image`](../config.html#kubernetes-container-image) 参数值配置成刚刚构建的镜像名,并运行启动一个 PyFlink session 集群:

{% highlight bash %}
$ ./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=<ClusterId> \
-Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.taskmanager.cpu=2 \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dresourcemanager.taskmanager-timeout=3600000 \
-Dkubernetes.container.image=pyflink:latest
{% endhighlight %}
</div>

</div>

### 将作业提交到现有 Session
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">

使用以下命令将 Flink 作业提交到 Kubernetes 集群。

{% highlight bash %}
$ ./bin/flink run -d -t kubernetes-session -Dkubernetes.cluster-id=<ClusterId> examples/streaming/WindowJoin.jar
{% endhighlight %}
</div>

<div data-lang="python" markdown="1">
使用以下命令将 PyFlink 作业提交到 Kubernetes 集群。
{% highlight bash %}
$ ./bin/flink run -d -t kubernetes-session -Dkubernetes.cluster-id=<ClusterId> -pym scala_function -pyfs examples/python/table/udf
{% endhighlight %}
</div>
</div>

### 访问 Job Manager UI

Expand Down Expand Up @@ -152,9 +203,11 @@ $ kubectl delete deployment/<ClusterID>
## Flink Kubernetes Application

### 启动 Flink Application
<div class="codetabs" markdown="1">

Application 模式允许用户创建单个镜像,其中包含他们的作业和 Flink 运行时,该镜像将按需自动创建和销毁集群组件。Flink 社区提供了可以构建[多用途自定义镜像](docker.html#customize-flink-image)的基础镜像。

<div data-lang="java" markdown="1">
{% highlight dockerfile %}
FROM flink
RUN mkdir -p $FLINK_HOME/usrlib
Expand All @@ -171,6 +224,44 @@ $ ./bin/flink run-application -p 8 -t kubernetes-application \
-Dkubernetes.container.image=<CustomImageName> \
local:///opt/flink/usrlib/my-flink-job.jar
{% endhighlight %}
</div>

<div data-lang="python" markdown="1">
{% highlight dockerfile %}
FROM flink

# 安装 python3 and pip3
RUN apt-get update -y && \
apt-get install -y python3.7 python3-pip python3.7-dev && rm -rf /var/lib/apt/lists/*
RUN ln -s /usr/bin/python3 /usr/bin/python

# 安装 Python Flink
RUN pip3 install apache-flink
COPY /path/of/python/codes /opt/python_codes

# 如果有引用第三方 Python 依赖库, 可以在构建镜像时安装上这些依赖
COPY /path/to/requirements.txt /opt/requirements.txt
RUN pip3 install -r requirements.txt

# 如果有引用第三方 Java 依赖, 也可以在构建镜像时加入到 ${FLINK_HOME}/usrlib 目录下
RUN mkdir -p $FLINK_HOME/usrlib
COPY /path/of/external/jar/dependencies $FLINK_HOME/usrlib/
{% endhighlight %}

假设构建的应用镜像名是 **my-pyflink-app:latest**, 通过下面的命令行运行 PyFlink 应用:
{% highlight bash %}
$ ./bin/flink run-application -p 8 -t kubernetes-application \
-Dkubernetes.cluster-id=<ClusterId> \
-Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.taskmanager.cpu=2 \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dkubernetes.container.image=my-pyflink-app:latest \
-pym <ENTRY_MODULE_NAME> (or -py /opt/python_codes/<ENTRY_FILE_NAME>) -pyfs /opt/python_codes
{% endhighlight %}
可以使用 `-py/--python` 参数指定 PyFlink 应用的入口脚本文件, 或者使用 `-pym/--pyModule` 参数指定入口模块名, 使用 `-pyfs/--pyFiles` 参数指定所有 Python 文件路径, 以及其他在 flink run 中能配置的 PyFlink 作业参数。
</div>
</div>


注意:Application 模式只支持 "local" 作为 schema。默认 jar 位于镜像中,而不是 Flink 客户端中。

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,15 +183,25 @@ protected void runApplication(String[] args) throws Exception {
final CustomCommandLine activeCommandLine =
validateAndGetActiveCommandLine(checkNotNull(commandLine));

final ProgramOptions programOptions = new ProgramOptions(commandLine);

final ApplicationDeployer deployer =
new ApplicationClusterDeployer(clusterClientServiceLoader);
new ApplicationClusterDeployer(clusterClientServiceLoader);

programOptions.validate();
final URI uri = PackagedProgramUtils.resolveURI(programOptions.getJarFilePath());
final Configuration effectiveConfiguration = getEffectiveConfiguration(
final ProgramOptions programOptions;
final Configuration effectiveConfiguration;

// No need to set a jarFile path for Pyflink job.
if (ProgramOptionsUtils.isPythonEntryPoint(commandLine)) {
programOptions = ProgramOptionsUtils.createPythonProgramOptions(commandLine);
effectiveConfiguration = getEffectiveConfiguration(
activeCommandLine, commandLine, programOptions, Collections.emptyList());
} else {
programOptions = new ProgramOptions(commandLine);
programOptions.validate();
final URI uri = PackagedProgramUtils.resolveURI(programOptions.getJarFilePath());
effectiveConfiguration = getEffectiveConfiguration(
activeCommandLine, commandLine, programOptions, Collections.singletonList(uri.toString()));
}

final ApplicationConfiguration applicationConfiguration =
new ApplicationConfiguration(programOptions.getProgramArgs(), programOptions.getEntryPointClassName());
deployer.run(effectiveConfiguration, applicationConfiguration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@

package org.apache.flink.client.cli;

import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.configuration.Configuration;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.net.URL;
import java.net.URLClassLoader;
Expand Down Expand Up @@ -93,4 +97,24 @@ public static ProgramOptions createPythonProgramOptions(CommandLine line) throws
"or not working as expected.", e);
}
}

public static void configurePythonExecution(Configuration configuration, PackagedProgram packagedProgram) throws CliArgsException, NoSuchFieldException, IllegalAccessException {
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
final CommandLine commandLine = CliFrontendParser.parse(commandOptions, packagedProgram.getArguments(),
true);
final ProgramOptions programOptions = createPythonProgramOptions(commandLine);

//Extract real program args by eliminating the PyFlink dependency options
String[] programArgs = programOptions.extractProgramArgs(commandLine);
//Set the real program args to the packaged program
Field argsField = packagedProgram.getClass().getDeclaredField("args");
argsField.setAccessible(true);
argsField.set(packagedProgram, programArgs);

//PyFlink dependency configurations are set in the pythonConfiguration when constructing the program option,
// we need to get the python configuration and merge with the execution configuration.
Field pythonConfiguration = programOptions.getClass().getDeclaredField("pythonConfiguration");
pythonConfiguration.setAccessible(true);
configuration.addAll((Configuration) pythonConfiguration.get(programOptions));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@

package org.apache.flink.client.deployment.application;

import org.apache.flink.client.cli.CliArgsException;
import org.apache.flink.client.cli.ProgramOptionsUtils;
import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
Expand Down Expand Up @@ -80,10 +83,16 @@ protected ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
return new MemoryArchivedExecutionGraphStore();
}

protected static void configureExecution(final Configuration configuration, final PackagedProgram program) throws MalformedURLException {
protected static void configureExecution(final Configuration configuration, final PackagedProgram program) throws MalformedURLException, IllegalAccessException, NoSuchFieldException, CliArgsException {
configuration.set(DeploymentOptions.TARGET, EmbeddedExecutor.NAME);
ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS, program.getJobJarAndDependencies(), URL::toString);
ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.CLASSPATHS, getClasspath(configuration, program), URL::toString);

// If it is a PyFlink Application, we need to extract Python dependencies from the program arguments, and
// configure to execution configurations.
if (PackagedProgramUtils.isPython(program.getMainClassName())){
ProgramOptionsUtils.configurePythonExecution(configuration, program);
}
}

private static List<URL> getClasspath(final Configuration configuration, final PackagedProgram program) throws MalformedURLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramRetriever;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
Expand Down Expand Up @@ -111,6 +112,19 @@ private Collection<URL> discoverUserClassPaths(@Nullable File jobDir) throws IOE
@Override
public PackagedProgram getPackagedProgram() throws FlinkException {
try {
// It is Python job if program arguments contain "-py"/--python" or "-pym/--pyModule", set the fixed
// jobClassName and jarFile path.
if (PackagedProgramUtils.isPython(jobClassName) || PackagedProgramUtils.isPython(programArguments)){
String pythonJobClassName = PackagedProgramUtils.getPythonDriverClassName();
File pythonJarFile = new File(PackagedProgramUtils.getPythonJar().getPath());
return PackagedProgram.newBuilder()
.setUserClassPaths(new ArrayList<>(userClassPaths))
.setArguments(programArguments)
.setJarFile(pythonJarFile)
.setEntryPointClassName(pythonJobClassName)
.build();
}

if (jarFile != null) {
return PackagedProgram.newBuilder()
.setUserClassPaths(new ArrayList<>(userClassPaths))
Expand Down
Loading

0 comments on commit 7050c04

Please sign in to comment.