Skip to content

Commit

Permalink
[hotfix][docs] Update Yarn setup documentation with Flip-6
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Apr 11, 2019
1 parent 517a04f commit 60507b8
Showing 1 changed file with 18 additions and 29 deletions.
47 changes: 18 additions & 29 deletions docs/ops/deployment/yarn_setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ under the License.

### Start a long-running Flink cluster on YARN

Start a YARN session with 4 Task Managers (each with 4 GB of Heapspace):
Start a YARN session where the job manager gets 1 GB of heap space and the task managers 4 GB of heap space assigned:

{% highlight bash %}
# get the hadoop2 package from the Flink download page at
# {{ site.download_url }}
curl -O <flink_hadoop2_download_url>
tar xvzf flink-{{ site.version }}-bin-hadoop2.tgz
cd flink-{{ site.version }}/
./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m
./bin/yarn-session.sh -jm 1024m -tm 4096m
{% endhighlight %}

Specify the `-s` flag for the number of processing slots per Task Manager. We recommend to set the number of slots to the number of processors per machine.
Expand All @@ -53,7 +53,7 @@ Once the session has been started, you can submit jobs to the cluster using the
curl -O <flink_hadoop2_download_url>
tar xvzf flink-{{ site.version }}-bin-hadoop2.tgz
cd flink-{{ site.version }}/
./bin/flink run -m yarn-cluster -yn 4 -yjm 1024m -ytm 4096m ./examples/batch/WordCount.jar
./bin/flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m ./examples/batch/WordCount.jar
{% endhighlight %}

## Flink YARN Session
Expand Down Expand Up @@ -96,8 +96,6 @@ This command will show you the following overview:

{% highlight bash %}
Usage:
Required
-n,--container <arg> Number of YARN container to allocate (=Number of Task Managers)
Optional
-D <arg> Dynamic properties
-d,--detached Start detached
Expand All @@ -112,10 +110,10 @@ Usage:

Please note that the Client requires the `YARN_CONF_DIR` or `HADOOP_CONF_DIR` environment variable to be set to read the YARN and HDFS configuration.

**Example:** Issue the following command to allocate 10 Task Managers, with 8 GB of memory and 32 processing slots each:
**Example:** Issue the following command to start a Yarn session cluster where each task manager is started with 8 GB of memory and 32 processing slots:

{% highlight bash %}
./bin/yarn-session.sh -n 10 -tm 8192 -s 32
./bin/yarn-session.sh -tm 8192 -s 32
{% endhighlight %}

The system will use the configuration in `conf/flink-conf.yaml`. Please follow our [configuration guide]({{ site.baseurl }}/ops/config.html) if you want to change something.
Expand All @@ -124,13 +122,15 @@ Flink on YARN will overwrite the following configuration parameters `jobmanager.

If you don't want to change the configuration file to set configuration parameters, there is the option to pass dynamic properties via the `-D` flag. So you can pass parameters this way: `-Dfs.overwrite-files=true -Dtaskmanager.network.memory.min=536346624`.

The example invocation starts 11 containers (even though only 10 containers were requested), since there is one additional container for the ApplicationMaster and Job Manager.
The example invocation starts a single container for the ApplicationMaster which runs the Job Manager.

The session cluster will automatically allocate additional containers which run the Task Managers when jobs are submitted to the cluster.

Once Flink is deployed in your YARN cluster, it will show you the connection details of the Job Manager.

Stop the YARN session by stopping the unix process (using CTRL+C) or by entering 'stop' into the client.

Flink on YARN will only start all requested containers if enough resources are available on the cluster. Most YARN schedulers account for the requested memory of the containers,
Flink on YARN will only start if enough resources are available for the ApplicationMaster on the cluster. Most YARN schedulers account for the requested memory of the containers,
some account also for the number of vcores. By default, the number of vcores is equal to the processing slots (`-s`) argument. The [`yarn.containers.vcores`]({{ site.baseurl }}/ops/config.html#yarn-containers-vcores) allows overwriting the
number of vcores with a custom value. In order for this parameter to work you should enable CPU scheduling in your cluster.

Expand Down Expand Up @@ -231,12 +231,10 @@ If the TaskManagers do not show up after a minute, you should investigate the is

The documentation above describes how to start a Flink cluster within a Hadoop YARN environment. It is also possible to launch Flink within YARN only for executing a single job.

Please note that the client then expects the `-yn` value to be set (number of TaskManagers).

***Example:***

{% highlight bash %}
./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar
./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar
{% endhighlight %}

The command line options of the YARN session are also available with the `./bin/flink` tool. They are prefixed with a `y` or `yarn` (for the long argument options).
Expand All @@ -261,8 +259,6 @@ The user-jars position in the class path can be controlled by setting the parame

Flink's YARN client has the following configuration parameters to control how to behave in case of container failures. These parameters can be set either from the `conf/flink-conf.yaml` or when starting the YARN session, using `-D` parameters.

- `yarn.reallocate-failed`: This parameter controls whether Flink should reallocate failed TaskManager containers. Default: true
- `yarn.maximum-failed-containers`: The maximum number of failed containers the ApplicationMaster accepts until it fails the YARN session. Default: The number of initially requested TaskManagers (`-n`).
- `yarn.application-attempts`: The number of ApplicationMaster (+ its TaskManager containers) attempts. If this value is set to 1 (default), the entire YARN session will fail when the Application master fails. Higher values specify the number of restarts of the ApplicationMaster by YARN.

## Debugging a failed YARN session
Expand Down Expand Up @@ -297,25 +293,18 @@ Users using Hadoop distributions from companies like Hortonworks, Cloudera or Ma

Some YARN clusters use firewalls for controlling the network traffic between the cluster and the rest of the network.
In those setups, Flink jobs can only be submitted to a YARN session from within the cluster's network (behind the firewall).
If this is not feasible for production use, Flink allows to configure a port range for all relevant services. With these
ranges configured, users can also submit jobs to Flink crossing the firewall.

Currently, two services are needed to submit a job:
If this is not feasible for production use, Flink allows to configure a port range for its REST endpoint, used for the client-cluster communication. With this
range configured, users can also submit jobs to Flink crossing the firewall.

* The JobManager (ApplicationMaster in YARN)
* The BlobServer running within the JobManager.
The configuration parameter for specifying the REST endpoint port is the following:

When submitting a job to Flink, the BlobServer will distribute the jars with the user code to all worker nodes (TaskManagers).
The JobManager receives the job itself and triggers the execution.
* `rest.bind-port`

The two configuration parameters for specifying the ports are the following:

* `yarn.application-master.port`
* `blob.server.port`

These two configuration options accept single ports (for example: "50010"), ranges ("50000-50025"), or a combination of
This configuration option accepts single ports (for example: "50010"), ranges ("50000-50025"), or a combination of
both ("50010,50011,50020-50025,50050-50075").

Please make sure that the configuration option `rest.port` has not been specified, because it has precedence over `rest.bind-port` and accepts no ranges.

(Hadoop is using a similar mechanism, there the configuration parameter is called `yarn.app.mapreduce.am.job.client.port-range`.)

## Background / Internals
Expand All @@ -329,7 +318,7 @@ The YARN client needs to access the Hadoop configuration to connect to the YARN
* Test if `YARN_CONF_DIR`, `HADOOP_CONF_DIR` or `HADOOP_CONF_PATH` are set (in that order). If one of these variables is set, it is used to read the configuration.
* If the above strategy fails (this should not be the case in a correct YARN setup), the client is using the `HADOOP_HOME` environment variable. If it is set, the client tries to access `$HADOOP_HOME/etc/hadoop` (Hadoop 2) and `$HADOOP_HOME/conf` (Hadoop 1).

When starting a new Flink YARN session, the client first checks if the requested resources (containers and memory) are available. After that, it uploads a jar that contains Flink and the configuration to HDFS (step 1).
When starting a new Flink YARN session, the client first checks if the requested resources (memory and vcores for the ApplicationMaster) are available. After that, it uploads a jar that contains Flink and the configuration to HDFS (step 1).

The next step of the client is to request (step 2) a YARN container to start the *ApplicationMaster* (step 3). Since the client registered the configuration and jar-file as a resource for the container, the NodeManager of YARN running on that particular machine will take care of preparing the container (e.g. downloading the files). Once that has finished, the *ApplicationMaster* (AM) is started.

Expand Down

0 comments on commit 60507b8

Please sign in to comment.