Skip to content

Commit

Permalink
[FLINK-28915][k8s] Support fetching remote job jar and additional dep…
Browse files Browse the repository at this point in the history
…endencies on Kubernetes

Closes apache#24065.
  • Loading branch information
ferenc-csaky authored and mbalassi committed Jan 25, 2024
1 parent a85bcb4 commit e63aa12
Show file tree
Hide file tree
Showing 39 changed files with 1,021 additions and 489 deletions.
9 changes: 6 additions & 3 deletions docs/content.zh/docs/deployment/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,13 @@ See the [History Server Docs]({{< ref "docs/deployment/advanced/historyserver" >
----
----

# Artifact Fetch
# Artifact Fetching

Flink can fetch user artifacts stored locally, on remote DFS, or accessible via an HTTP(S) endpoint.
{{< hint info >}}
**Note:** This is only supported in Standalone Application Mode and Native Kubernetes Application Mode.
{{< /hint >}}

*Artifact Fetch* is a features that Flink will fetch user artifact stored in DFS or download by HTTP/HTTPS.
Note that it is only supported in StandAlone Application Mode and Native Kubernetes Application Mode.
{{< generated/artifact_fetch_configuration >}}

----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,26 +107,24 @@ $ ./bin/flink run-application \
# FileSystem
$ ./bin/flink run-application \
--target kubernetes-application \
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.17-SNAPSHOT.jar \
-Dkubernetes.cluster-id=my-first-application-cluster \
-Dkubernetes.container.image=custom-image-name \
s3://my-bucket/my-flink-job.jar

# Http/Https Schema
# HTTP(S)
$ ./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-first-application-cluster \
-Dkubernetes.container.image=custom-image-name \
http://ip:port/my-flink-job.jar
https://ip:port/my-flink-job.jar
```
{{< hint info >}}
Now, The jar artifact supports downloading from the [flink filesystem]({{< ref "docs/deployment/filesystems/overview" >}}) or Http/Https in Application Mode.
The jar package will be downloaded from filesystem to
[user.artifacts.base.dir]({{< ref "docs/deployment/config" >}}#user-artifacts-base-dir)/[kubernetes.namespace]({{< ref "docs/deployment/config" >}}#kubernetes-namespace)/[kubernetes.cluster-id]({{< ref "docs/deployment/config" >}}#kubernetes-cluster-id) path in image.
JAR fetching supports downloading from [filesystems]({{< ref "docs/deployment/filesystems/overview" >}}) or HTTP(S) in Application Mode.
The JAR will be downloaded to
[user.artifacts.base-dir]({{< ref "docs/deployment/config" >}}#user-artifacts-base-dir)/[kubernetes.namespace]({{< ref "docs/deployment/config" >}}#kubernetes-namespace)/[kubernetes.cluster-id]({{< ref "docs/deployment/config" >}}#kubernetes-cluster-id) path in image.
{{< /hint >}}

<span class="label label-info">Note</span> `local` schema is still supported. If you use `local` schema, the jar must be provided in the image or download by a init container like [Example]({{< ref "docs/deployment/resource-providers/native_kubernetes" >}}#example-of-pod-template).

<span class="label label-info">Note</span> `local` schema is still supported. If you use `local` schema, the JAR must be provided in the image or downloaded by an init container as described in [this example](#example-of-pod-template).

The `kubernetes.cluster-id` option specifies the cluster name and must be unique.
If you do not specify this option, then Flink will generate a random name.
Expand Down Expand Up @@ -348,7 +346,7 @@ $ kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edi
```

If you do not want to use the `default` service account, use the following command to create a new `flink-service-account` service account and set the role binding.
Then use the config option `-Dkubernetes.service-account=flink-service-account` to make the JobManager pod use the `flink-service-account` service account to create/delete TaskManager pods and leader ConfigMaps.
Then use the config option `-Dkubernetes.service-account=flink-service-account` to configure the JobManager pod's service account used to create and delete TaskManager pods and leader ConfigMaps.
Also this will allow the TaskManager to watch leader ConfigMaps to retrieve the address of JobManager and ResourceManager.

```bash
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ The *job artifacts* are included into the class path of Flink's JVM process with
* all other necessary dependencies or resources, not included into Flink.

To deploy a cluster for a single job with Docker, you need to
* make *job artifacts* available locally in all containers under `/opt/flink/usrlib` or pass jar path by *jar-file* argument.
* make *job artifacts* available locally in all containers under `/opt/flink/usrlib`, or pass a list of jars via the `--jars` argument
* start a JobManager container in the *Application cluster* mode
* start the required number of TaskManager containers.

Expand Down Expand Up @@ -156,7 +156,6 @@ To make the **job artifacts available** locally in the container, you can
* **or extend the Flink image** by writing a custom `Dockerfile`, build it and use it for starting the JobManager and TaskManagers:
```dockerfile
FROM flink
ADD /host/path/to/job/artifacts/1 /opt/flink/usrlib/artifacts/1
Expand All @@ -175,36 +174,39 @@ To make the **job artifacts available** locally in the container, you can
$ docker run flink_with_job_artifacts taskmanager
```
* **or pass jar path by jar-file argument** when you start the JobManager:
* **or pass jar path by `jars` argument** when you start the JobManager:
```sh
$ FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager"
$ docker network create flink-network

$ docker run \
--env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
--env ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.17-SNAPSHOT.jar \
--env ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{< version >}}.jar \
--name=jobmanager \
--network flink-network \
flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}} standalone-job \
--job-classname com.job.ClassName \
--jar-file s3://my-bucket/my-flink-job.jar
--jars s3://my-bucket/my-flink-job.jar,s3://my-bucket/my-flink-udf.jar \
[--job-id <job id>] \
[--fromSavepoint /path/to/savepoint [--allowNonRestoredState]] \
[job arguments]

```
The `standalone-job` argument starts a JobManager container in the Application Mode.
#### JobManager additional command line arguments
You can provide the following additional command line arguments to the cluster entrypoint:
* `--job-classname <job class name>`: Class name of the job to run.
* `--job-classname <job class name>` (optional): Class name of the job to run.
By default, Flink scans its class path for a JAR with a Main-Class or program-class manifest entry and chooses it as the job class.
Use this command line argument to manually set the job class.
{{< hint warning >}}
This argument is required in case that no or more than one JAR with such a manifest entry is available on the class path.
{{< /hint >}}
* `--job-id <job id>` (optional): Manually set a Flink job ID for the job (default: 00000000000000000000000000000000)
Expand All @@ -216,12 +218,12 @@ You can provide the following additional command line arguments to the cluster e
* `--allowNonRestoredState` (optional): Skip broken savepoint state
Additionally you can specify this argument to allow that savepoint state is skipped which cannot be restored.
Additionally, you can specify this argument to allow that savepoint state is skipped which cannot be restored.
* `--jar-file` (optional): the path of jar artifact
* `--jars` (optional): the paths of the job jar and any additional artifact(s) separated by commas
You can specify this argument to point the job artifacts stored in [flink filesystem]({{< ref "docs/deployment/filesystems/overview" >}}) or Http/Https. Flink will fetch it when deploy the job.
(e.g., s3://my-bucket/my-flink-job.jar).
You can specify this argument to point the job artifacts stored in [flink filesystem]({{< ref "docs/deployment/filesystems/overview" >}}) or download via HTTP(S).
Flink will fetch these during the job deployment. (e.g. `--jars s3://my-bucket/my-flink-job.jar`, `--jars s3://my-bucket/my-flink-job.jar,s3://my-bucket/my-flink-udf.jar` ).
If the main function of the user job main class accepts arguments, you can also pass them at the end of the `docker run` command.
Expand Down Expand Up @@ -326,7 +328,7 @@ services:
image: flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}}
ports:
- "8081:8081"
command: standalone-job --job-classname com.job.ClassName [--job-id <job id>] [--fromSavepoint /path/to/savepoint] [--allowNonRestoredState] ["--jar-file" "/path/to/user-artifact"] [job arguments]
command: standalone-job --job-classname com.job.ClassName [--jars /path/to/artifact1,/path/to/artifact2] [--job-id <job id>] [--fromSavepoint /path/to/savepoint] [--allowNonRestoredState] [job arguments]
volumes:
- /host/path/to/job/artifacts:/opt/flink/usrlib
environment:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ $ ./bin/flink run -m localhost:8081 ./examples/streaming/TopSpeedWindowing.jar

* 可以从 [资源定义示例](#application-cluster-resource-definitions) 中的 `job-artifacts-volume` 处获取。假如是在 minikube 集群中创建这些组件,那么定义示例中的 job-artifacts-volume 可以挂载为主机的本地目录。如果不使用 minikube 集群,那么可以使用 Kubernetes 集群中任何其它可用类型的 volume 来提供 *job artifacts*
* 构建一个已经包含 *job artifacts* 参数的[自定义镜像]({{< ref "docs/deployment/resource-providers/standalone/docker" >}}#advanced-customization)。
* 通过指定[--jar file]({{< ref "docs/deployment/resource-providers/standalone/docker" >}}#jobmanager-additional-command-line-arguments)参数提供 存储在DFS或者可由HTTP/HTTPS下载的*job artifacts*路径
* 通过指定[--jars]({{< ref "docs/deployment/resource-providers/standalone/docker" >}}#jobmanager-additional-command-line-arguments)参数提供 存储在DFS或者可由HTTP/HTTPS下载的*job artifacts*路径

在创建[通用集群组件](#common-cluster-resource-definitions)后,指定 [Application 集群资源定义](#application-cluster-resource-definitions)文件,执行 `kubectl` 命令来启动 Flink Application 集群:

Expand Down Expand Up @@ -627,7 +627,7 @@ spec:
- name: jobmanager
image: apache/flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}}
env:
args: ["standalone-job", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] # 可选的参数项: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState", "--jar-file", "/path/to/user-artifact"]
args: ["standalone-job", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] # 可选的参数项: ["--job-id", "<job id>", "--jars", "/path/to/artifact1,/path/to/artifact2", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
ports:
- containerPort: 6123
name: rpc
Expand Down Expand Up @@ -686,7 +686,7 @@ spec:
apiVersion: v1
fieldPath: status.podIP
# 下面的 args 参数会使用 POD_IP 对应的值覆盖 config map 中 jobmanager.rpc.address 的属性值。
args: ["standalone-job", "--host", "$(POD_IP)", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] # 可选参数项: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState", "--jar-file", "/path/to/user-artifact"]
args: ["standalone-job", "--host", "$(POD_IP)", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] # 可选参数项: ["--job-id", "<job id>", "--jars", "/path/to/artifact1,/path/to/artifact2", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
ports:
- containerPort: 6123
name: rpc
Expand Down
9 changes: 6 additions & 3 deletions docs/content/docs/deployment/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,13 @@ See the [History Server Docs]({{< ref "docs/deployment/advanced/historyserver" >
----
----

# Artifact Fetch
# Artifact Fetching

Flink can fetch user artifacts stored locally, on remote DFS, or accessible via an HTTP(S) endpoint.
{{< hint info >}}
**Note:** This is only supported in Standalone Application Mode and Native Kubernetes Application Mode.
{{< /hint >}}

*Artifact Fetch* is a features that Flink will fetch user artifact stored in DFS or download by HTTP/HTTPS.
Note that it is only supported in StandAlone Application Mode and Native Kubernetes Application Mode.
{{< generated/artifact_fetch_configuration >}}

----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,19 +119,20 @@ $ ./bin/flink run-application \
-Dkubernetes.container.image=custom-image-name \
s3://my-bucket/my-flink-job.jar

# Http/Https Schema
# HTTP(S)
$ ./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-first-application-cluster \
-Dkubernetes.container.image=custom-image-name \
http://ip:port/my-flink-job.jar
https://ip:port/my-flink-job.jar
```
{{< hint info >}}
Now, The jar artifact supports downloading from the [flink filesystem]({{< ref "docs/deployment/filesystems/overview" >}}) or Http/Https in Application Mode.
The jar package will be downloaded from filesystem to
[user.artifacts.base.dir]({{< ref "docs/deployment/config" >}}#user-artifacts-base-dir)/[kubernetes.namespace]({{< ref "docs/deployment/config" >}}#kubernetes-namespace)/[kubernetes.cluster-id]({{< ref "docs/deployment/config" >}}#kubernetes-cluster-id) path in image.
JAR fetching supports downloading from [filesystems]({{< ref "docs/deployment/filesystems/overview" >}}) or HTTP(S) in Application Mode.
The JAR will be downloaded to
[user.artifacts.base-dir]({{< ref "docs/deployment/config" >}}#user-artifacts-base-dir)/[kubernetes.namespace]({{< ref "docs/deployment/config" >}}#kubernetes-namespace)/[kubernetes.cluster-id]({{< ref "docs/deployment/config" >}}#kubernetes-cluster-id) path in image.
{{< /hint >}}
<span class="label label-info">Note</span> `local` schema is still supported. If you use `local` schema, the jar must be provided in the image or download by a init container like [Example]({{< ref "docs/deployment/resource-providers/native_kubernetes" >}}#example-of-pod-template).

<span class="label label-info">Note</span> `local` schema is still supported. If you use `local` schema, the JAR must be provided in the image or downloaded by an init container as described in [this example](#example-of-pod-template).

The `kubernetes.cluster-id` option specifies the cluster name and must be unique.
If you do not specify this option, then Flink will generate a random name.
Expand Down Expand Up @@ -353,7 +354,7 @@ $ kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edi
```

If you do not want to use the `default` service account, use the following command to create a new `flink-service-account` service account and set the role binding.
Then use the config option `-Dkubernetes.service-account=flink-service-account` to make the JobManager pod use the `flink-service-account` service account to create/delete TaskManager pods and leader ConfigMaps.
Then use the config option `-Dkubernetes.service-account=flink-service-account` to configure the JobManager pod's service account used to create and delete TaskManager pods and leader ConfigMaps.
Also this will allow the TaskManager to watch leader ConfigMaps to retrieve the address of JobManager and ResourceManager.

```bash
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ The *job artifacts* are included into the class path of Flink's JVM process with
* all other necessary dependencies or resources, not included into Flink.

To deploy a cluster for a single job with Docker, you need to
* make *job artifacts* available locally in all containers under `/opt/flink/usrlib` or pass jar path by *jar-file* argument.
* make *job artifacts* available locally in all containers under `/opt/flink/usrlib`, or pass a list of jars via the `--jars` argument
* start a JobManager container in the *Application cluster* mode
* start the required number of TaskManager containers.

Expand Down Expand Up @@ -156,7 +156,6 @@ To make the **job artifacts available** locally in the container, you can
* **or extend the Flink image** by writing a custom `Dockerfile`, build it and use it for starting the JobManager and TaskManagers:
```dockerfile
FROM flink
ADD /host/path/to/job/artifacts/1 /opt/flink/usrlib/artifacts/1
Expand All @@ -175,36 +174,39 @@ To make the **job artifacts available** locally in the container, you can
$ docker run flink_with_job_artifacts taskmanager
```
* **or pass jar path by jar-file argument** when you start the JobManager:
* **or pass jar path by `jars` argument** when you start the JobManager:
```sh
$ FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager"
$ docker network create flink-network

$ docker run \
--env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
--env ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.17-SNAPSHOT.jar \
--env ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{< version >}}.jar \
--name=jobmanager \
--network flink-network \
flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}} standalone-job \
--job-classname com.job.ClassName \
--jar-file s3://my-bucket/my-flink-job.jar
--jars s3://my-bucket/my-flink-job.jar,s3://my-bucket/my-flink-udf.jar \
[--job-id <job id>] \
[--fromSavepoint /path/to/savepoint [--allowNonRestoredState]] \
[job arguments]

```
The `standalone-job` argument starts a JobManager container in the Application Mode.
#### JobManager additional command line arguments
You can provide the following additional command line arguments to the cluster entrypoint:
* `--job-classname <job class name>`: Class name of the job to run.
* `--job-classname <job class name>` (optional): Class name of the job to run.
By default, Flink scans its class path for a JAR with a Main-Class or program-class manifest entry and chooses it as the job class.
Use this command line argument to manually set the job class.
{{< hint warning >}}
This argument is required in case that no or more than one JAR with such a manifest entry is available on the class path.
{{< /hint >}}
* `--job-id <job id>` (optional): Manually set a Flink job ID for the job (default: 00000000000000000000000000000000)
Expand All @@ -216,12 +218,12 @@ You can provide the following additional command line arguments to the cluster e
* `--allowNonRestoredState` (optional): Skip broken savepoint state
Additionally you can specify this argument to allow that savepoint state is skipped which cannot be restored.
Additionally, you can specify this argument to allow that savepoint state is skipped which cannot be restored.
* `--jar-file` (optional): the path of jar artifact
* `--jars` (optional): the paths of the job jar and any additional artifact(s) separated by commas
You can specify this argument to point the job artifacts stored in [flink filesystem]({{< ref "docs/deployment/filesystems/overview" >}}) or Http/Https. Flink will fetch it when deploy the job.
(e.g., s3://my-bucket/my-flink-job.jar).
You can specify this argument to point the job artifacts stored in [flink filesystem]({{< ref "docs/deployment/filesystems/overview" >}}) or download via HTTP(S).
Flink will fetch these during the job deployment. (e.g. `--jars s3://my-bucket/my-flink-job.jar`, `--jars s3://my-bucket/my-flink-job.jar,s3://my-bucket/my-flink-udf.jar` ).
If the main function of the user job main class accepts arguments, you can also pass them at the end of the `docker run` command.
Expand Down Expand Up @@ -326,7 +328,7 @@ services:
image: flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}}
ports:
- "8081:8081"
command: standalone-job --job-classname com.job.ClassName [--job-id <job id>] [--fromSavepoint /path/to/savepoint] [--allowNonRestoredState] ["--jar-file" "/path/to/user-artifact"] [job arguments]
command: standalone-job --job-classname com.job.ClassName [--job-id <job id>] [--jars /path/to/artifact1,/path/to/artifact2] [--fromSavepoint /path/to/savepoint] [--allowNonRestoredState] [job arguments]
volumes:
- /host/path/to/job/artifacts:/opt/flink/usrlib
environment:
Expand Down
Loading

0 comments on commit e63aa12

Please sign in to comment.