Skip to content

Commit

Permalink
[FLINK-4253] [config] Clean up renaming of 'recovery.mode'
Browse files Browse the repository at this point in the history
- Renamed config keys and default values to be consistent
- Updated default flink-conf.yaml
- Cleaned up code occurrences of recovery mode

This closes apache#2342.
  • Loading branch information
uce committed Aug 24, 2016
1 parent 01ffe34 commit 58165d6
Show file tree
Hide file tree
Showing 37 changed files with 389 additions and 362 deletions.
34 changes: 20 additions & 14 deletions docs/setup/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ will be used under the directory specified by jobmanager.web.tmpdir.

- `state.backend.fs.checkpointdir`: Directory for storing checkpoints in a Flink supported filesystem. Note: State backend must be accessible from the JobManager, use `file://` only for local setups.

- `high-availability.zookeeper.storageDir`: Required for HA. Directory for storing JobManager metadata; this is persisted in the state backend and only a pointer to this state is stored in ZooKeeper. Exactly like the checkpoint directory it must be accessible from the JobManager and a local filesystem should only be used for local deployments. Previously this config was named as `recovery.zookeeper.storageDir`.
- `high-availability.zookeeper.storageDir`: Required for HA. Directory for storing JobManager metadata; this is persisted in the state backend and only a pointer to this state is stored in ZooKeeper. Exactly like the checkpoint directory it must be accessible from the JobManager and a local filesystem should only be used for local deployments. Previously this key was named `recovery.zookeeper.storageDir`.

- `blob.storage.directory`: Directory for storing blobs (such as user jar's) on the TaskManagers.

Expand Down Expand Up @@ -283,31 +283,37 @@ of the JobManager, because the same ActorSystem is used. Its not possible to use
For example when running Flink on YARN on an environment with a restrictive firewall, this option allows specifying a range of allowed ports.


## High Availability Mode
## High Availability (HA)

- `high-availability`: (Default 'none') Defines the recovery mode used for the cluster execution. Currently, Flink supports the 'none' mode where only a single JobManager runs and no JobManager state is checkpointed. The high availability mode 'zookeeper' supports the execution of multiple JobManagers and JobManager state checkpointing. Among the group of JobManagers, ZooKeeper elects one of them as the leader which is responsible for the cluster execution. In case of a JobManager failure, a standby JobManager will be elected as the new leader and is given the last checkpointed JobManager state. In order to use the 'zookeeper' mode, it is mandatory to also define the `recovery.zookeeper.quorum` configuration value. Previously this config was named 'recovery.mode' and the default config was 'standalone'.
- `high-availability`: Defines the high availability mode used for the cluster execution. Currently, Flink supports the following modes:
- `none` (default): No high availability. A single JobManager runs and no JobManager state is checkpointed.
- `zookeeper`: Supports the execution of multiple JobManagers and JobManager state checkpointing. Among the group of JobManagers, ZooKeeper elects one of them as the leader which is responsible for the cluster execution. In case of a JobManager failure, a standby JobManager will be elected as the new leader and is given the last checkpointed JobManager state. In order to use the 'zookeeper' mode, it is mandatory to also define the `high-availability.zookeeper.quorum` configuration value.

- `high-availability.zookeeper.quorum`: Defines the ZooKeeper quorum URL which is used to connet to the ZooKeeper cluster when the 'zookeeper' recovery mode is selected. Previously this config was name as `recovery.zookeeper.quorum`.
Previously this key was named `recovery.mode` and the default value was `standalone`.

- `high-availability.zookeeper.path.root`: (Default '/flink') Defines the root dir under which the ZooKeeper recovery mode will create namespace directories. Previously this config was name as `recovery.zookeeper.path.root`.
### ZooKeeper-based HA Mode

- `high-availability.zookeeper.path.namespace`: (Default '/default_ns' in standalone mode, or the <yarn-application-id> under Yarn) Defines the subdirectory under the root dir where the ZooKeeper recovery mode will create znodes. This allows to isolate multiple applications on the same ZooKeeper. Previously this config was named as `recovery.zookeeper.path.namespace`.
- `high-availability.zookeeper.quorum`: Defines the ZooKeeper quorum URL which is used to connect to the ZooKeeper cluster when the 'zookeeper' HA mode is selected. Previously this key was name `recovery.zookeeper.quorum`.

- `high-availability.zookeeper.path.latch`: (Default '/leaderlatch') Defines the znode of the leader latch which is used to elect the leader. Previously this config was named as `recovery.zookeeper.path.latch`.
- `high-availability.zookeeper.path.root`: (Default `/flink`) Defines the root dir under which the ZooKeeper HA mode will create namespace directories. Previously this ket was named `recovery.zookeeper.path.root`.

- `high-availability.zookeeper.path.leader`: (Default '/leader') Defines the znode of the leader which contains the URL to the leader and the current leader session ID Previously this config was named as `recovery.zookeeper.path.leader`.
- `high-availability.zookeeper.path.namespace`: (Default `/default_ns` in standalone cluster mode, or the <yarn-application-id> under YARN) Defines the subdirectory under the root dir where the ZooKeeper HA mode will create znodes. This allows to isolate multiple applications on the same ZooKeeper. Previously this key was named `recovery.zookeeper.path.namespace`.

- `high-availability.zookeeper.storageDir`: Defines the directory in the state backend where the JobManager metadata will be stored (ZooKeeper only keeps pointers to it). Required for HA. Previously this config was named as `recovery.zookeeper.storageDir`.
- `high-availability.zookeeper.path.latch`: (Default `/leaderlatch`) Defines the znode of the leader latch which is used to elect the leader. Previously this key was named `recovery.zookeeper.path.latch`.

- `high-availability.zookeeper.client.session-timeout`: (Default '60000') Defines the session timeout for the ZooKeeper session in ms. Previously this config was named as `recovery.zookeeper.client.session-timeout`
- `high-availability.zookeeper.path.leader`: (Default `/leader`) Defines the znode of the leader which contains the URL to the leader and the current leader session ID. Previously this key was named `recovery.zookeeper.path.leader`.

- `high-availability.zookeeper.client.connection-timeout`: (Default '15000') Defines the connection timeout for ZooKeeper in ms. Previously this config was named as `recovery.zookeeper.client.connection-timeout`.
- `high-availability.zookeeper.storageDir`: Defines the directory in the state backend where the JobManager metadata will be stored (ZooKeeper only keeps pointers to it). Required for HA. Previously this key was named `recovery.zookeeper.storageDir`.

- `high-availability.zookeeper.client.retry-wait`: (Default '5000') Defines the pause between consecutive retries in ms. Previously this config was named as `recovery.zookeeper.client.retry-wait`.
- `high-availability.zookeeper.client.session-timeout`: (Default `60000`) Defines the session timeout for the ZooKeeper session in ms. Previously this key was named `recovery.zookeeper.client.session-timeout`

- `high-availability.zookeeper.client.max-retry-attempts`: (Default '3') Defines the number of connection retries before the client gives up. Previously this config was named as `recovery.zookeeper.client.max-retry-attempts`.
- `high-availability.zookeeper.client.connection-timeout`: (Default `15000`) Defines the connection timeout for ZooKeeper in ms. Previously this key was named `recovery.zookeeper.client.connection-timeout`.

- `high-availability.job.delay`: (Default 'akka.ask.timeout') Defines the delay before persisted jobs are recovered in case of a recovery situation. Previously this config was named as `recovery.job.delay`.
- `high-availability.zookeeper.client.retry-wait`: (Default `5000`) Defines the pause between consecutive retries in ms. Previously this key was named `recovery.zookeeper.client.retry-wait`.

- `high-availability.zookeeper.client.max-retry-attempts`: (Default `3`) Defines the number of connection retries before the client gives up. Previously this key was named `recovery.zookeeper.client.max-retry-attempts`.

- `high-availability.job.delay`: (Default `akka.ask.timeout`) Defines the delay before persisted jobs are recovered in case of a master recovery situation. Previously this key was named `recovery.job.delay`.

## Environment

Expand Down
16 changes: 8 additions & 8 deletions docs/setup/jobmanager_high_availability.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ As an example, consider the following setup with three JobManager instances:

To enable JobManager High Availability you have to set the **high-availability mode** to *zookeeper*, configure a **ZooKeeper quorum** and set up a **masters file** with all JobManagers hosts and their web UI ports.

Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed coordination* between all running JobManager instances. ZooKeeper is a separate service from Flink, which provides highly reliable distributed coordination via leader election and light-weight consistent state storage. Check out [ZooKeeper's Getting Started Guide](http://zookeeper.apache.org/doc/trunk/zookeeperStarted.html) for more information about ZooKeeper. Flink includes scripts to [bootstrap a simple ZooKeeper](#bootstrap-zookeeper) installation.
Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed coordination* between all running JobManager instances. ZooKeeper is a separate service from Flink, which provides highly reliable distributed coordination via leader election and light-weight consistent state storage. Check out [ZooKeeper's Getting Started Guide](http://zookeeper.apache.org/doc/trunk/zookeeperStarted.html) for more information about ZooKeeper. Flink includes scripts to [bootstrap a simple ZooKeeper](#bootstrap-zookeeper) installation.

#### Masters File (masters)

Expand All @@ -67,7 +67,6 @@ In order to start an HA-cluster add the following configuration keys to `conf/fl
- **high-availability mode** (required): The *high-availability mode* has to be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high availability mode.

<pre>high-availability: zookeeper</pre>
- **Previously this config was named 'recovery.mode' and the default config was 'standalone'.

- **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide the distributed coordination service.

Expand All @@ -83,31 +82,32 @@ In order to start an HA-cluster add the following configuration keys to `conf/fl

<pre>high-availability.zookeeper.path.namespace: /default_ns # important: customize per cluster</pre>

**Important**: if you are running multiple Flink HA clusters, you have to manually configure separate namespaces for each cluster. By default, the Yarn cluster and the Yarn session automatically generate namespaces based on Yarn application id. A manual configuration overrides this behaviour in Yarn. Specifying a namespace with the -z CLI option, in turn, overrides manual configuration.
**Important**: if you are running multiple Flink HA clusters, you have to manually configure separate namespaces for each cluster. By default, the Yarn cluster and the Yarn session automatically generate namespaces based on Yarn application id. A manual configuration overrides this behaviour in Yarn. Specifying a namespace with the -z CLI option, in turn, overrides manual configuration.

- **State backend and storage directory** (required): JobManager meta data is persisted in the *state backend* and only a pointer to this state is stored in ZooKeeper. Currently, only the file system state backend is supported in HA mode.

<pre>
high-availability.zookeeper.storageDir: hdfs:///flink/recovery</pre>
state.backend: filesystem
state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
high-availability.zookeeper.storageDir: hdfs:///flink/recovery</pre>

The `storageDir` stores all meta data needed to recover a JobManager failure.

After configuring the masters and the ZooKeeper quorum, you can use the provided cluster startup scripts as usual. They will start an HA-cluster. Keep in mind that the **ZooKeeper quorum has to be running** when you call the scripts and make sure to **configure a separate ZooKeeper root path** for each HA cluster you are starting.

#### Example: Standalone Cluster with 2 JobManagers

1. **Configure recovery mode and ZooKeeper quorum** in `conf/flink-conf.yaml`:
1. **Configure high availability mode and ZooKeeper quorum** in `conf/flink-conf.yaml`:

<pre>
high-availability: zookeeper
high-availability.zookeeper.quorum: localhost:2181
high-availability.zookeeper.path.root: /flink
high-availability.zookeeper.path.namespace: /cluster_one # important: customize per cluster
high-availability.zookeeper.storageDir: hdfs:///flink/recovery</pre>

state.backend: filesystem
state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
high-availability.zookeeper.storageDir: hdfs:///flink/recovery</pre>

2. **Configure masters** in `conf/masters`:

Expand Down Expand Up @@ -184,16 +184,16 @@ This means that the application can be restarted 10 times before YARN fails the

#### Example: Highly Available YARN Session

1. **Configure recovery mode and ZooKeeper quorum** in `conf/flink-conf.yaml`:
1. **Configure HA mode and ZooKeeper quorum** in `conf/flink-conf.yaml`:

<pre>
high-availability: zookeeper
high-availability.zookeeper.quorum: localhost:2181
high-availability.zookeeper.storageDir: hdfs:///flink/recovery
high-availability.zookeeper.path.root: /flink
high-availability.zookeeper.path.namespace: /cluster_one # important: customize per cluster
state.backend: filesystem
state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
high-availability.zookeeper.storageDir: hdfs:///flink/recovery
yarn.application-attempts: 10</pre>

3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only possible to run a single ZooKeeper server per machine):
Expand Down
Loading

0 comments on commit 58165d6

Please sign in to comment.