diff --git a/docs/_includes/navbar.html b/docs/_includes/navbar.html
index 62bdce8db53e9..c565feb3cc4e0 100644
--- a/docs/_includes/navbar.html
+++ b/docs/_includes/navbar.html
@@ -81,8 +81,9 @@
Python API Beta
- Interactive Scala Shell
+ Fault Tolerance
State in Streaming Programs
+ Interactive Scala Shell
DataSet Transformations
Best Practices
Connectors (DataSet API)
diff --git a/docs/apis/fault_tolerance.md b/docs/apis/fault_tolerance.md
new file mode 100644
index 0000000000000..677ff95c7cde9
--- /dev/null
+++ b/docs/apis/fault_tolerance.md
@@ -0,0 +1,265 @@
+---
+title: "Fault Tolerance"
+is_beta: false
+---
+
+
+
+
+Flink's fault tolerance mechanism recovers programs in the presence of failures and
+continues to execute them. Such failures include machine hardware failures, network failures,
+transient program failures, etc.
+
+* This will be replaced by the TOC
+{:toc}
+
+
+Streaming Fault Tolerance (DataStream API)
+------------------------------------------
+
+Flink has a checkpointing mechanism that recovers streaming jobs after failues. The checkpointing mechanism requires a *persistent* (or *durable*) source that
+can be asked for prior records again (Apache Kafka is a good example of such a source).
+
+The checkpointing mechanism stores the progress in the data sources and data sinks, the state of windows, as well as the user-defined state (see [Working with State]({{ site.baseurl }}/apis/streaming_guide.html#working-with-state)) consistently to provide *exactly once* processing semantics. Where the checkpoints are stored (e.g., JobManager memory, file system, database) depends on the configured [state backend]({{ site.baseurl }}/apis/state_backends.html).
+
+The [docs on streaming fault tolerance]({{ site.baseurl }}/internals/stream_checkpointing.html) describe in detail the technique behind Flink's streaming fault tolerance mechanism.
+
+To enable checkpointing, call `enableCheckpointing(n)` on the `StreamExecutionEnvironment`, where *n* is the checkpoint interval in milliseconds.
+
+Other parameters for checkpointing include:
+
+- *Number of retries*: The `setNumberOfExecutionRerties()` method defines how many times the job is restarted after a failure.
+ When checkpointing is activated, but this value is not explicitly set, the job is restarted infinitely often.
+
+- *exactly-once vs. at-least-once*: You can optionally pass a mode to the `enableCheckpointing(n)` method to choose between the two guarantee levels.
+ Exactly-once is preferrable for most applications. At-least-once may be relevant for certain super-low-latency (consistently few milliseconds) applications.
+
+- *number of concurrent checkpoints*: By default, the system will not trigger another checkpoint while one is still in progress. This ensures that the topology does not spend too much time on checkpoints and not make progress with processing the streams. It is possible to allow for multiple overlapping checkpoints, which is interesting for pipelines that have a certain processing delay (for example because the functions call external services that need some time to respond) but that still want to do very frequent checkpoints (100s of milliseconds) to re-process very little upon failures.
+
+- *checkpoint timeout*: The time after which a checkpoint-in-progress is aborted, if it did not complete until then.
+
+
+
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+// start a checkpoint every 1000 ms
+env.enableCheckpointing(1000);
+
+// advanced options:
+
+// set mode to exactly-once (this is the default)
+env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+
+// checkpoints have to complete within one minute, or are discarded
+env.getCheckpointConfig().setCheckpointTimeout(60000);
+
+// allow only one checkpoint to be in progress at the same time
+env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+{% endhighlight %}
+
+
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+
+// start a checkpoint every 1000 ms
+env.enableCheckpointing(1000)
+
+// advanced options:
+
+// set mode to exactly-once (this is the default)
+env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
+
+// checkpoints have to complete within one minute, or are discarded
+env.getCheckpointConfig.setCheckpointTimeout(60000)
+
+// allow only one checkpoint to be in progress at the same time
+env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
+{% endhighlight %}
+
+
+
+
+### Fault Tolerance Guarantees of Data Sources and Sinks
+
+Flink can guarantee exactly-once state updates to user-defined state only when the source participates in the
+snapshotting mechanism. This is currently guaranteed for the Kafka source (and internal number generators), but
+not for other sources. The following table lists the state update guarantees of Flink coupled with the bundled sources:
+
+
+
+
+ Source |
+ Guarantees |
+ Notes |
+
+
+
+
+ Apache Kafka |
+ exactly once |
+ Use the appropriate Kafka connector for your version |
+
+
+ RabbitMQ |
+ at most once (v 0.10) / exactly once (v 1.0) |
+ |
+
+
+ Twitter Streaming API |
+ at most once |
+ |
+
+
+ Collections |
+ exactly once |
+ |
+
+
+ Files |
+ at least once |
+ At failure the file will be read from the beginning |
+
+
+ Sockets |
+ at most once |
+ |
+
+
+
+
+To guarantee end-to-end exactly-once record delivery (in addition to exactly-once state semantics), the data sink needs
+to take part in the checkpointing mechanism. The following table lists the delivery guarantees (assuming exactly-once
+state updates) of Flink coupled with bundled sinks:
+
+
+
+
+ Sink |
+ Guarantees |
+ Notes |
+
+
+
+
+ HDFS rolling sink |
+ exactly once |
+ Implementation depends on Hadoop version |
+
+
+ Elasticsearch |
+ at least once |
+ |
+
+
+ Kafka producer |
+ at least once |
+ |
+
+
+ File sinks |
+ at least once |
+ |
+
+
+ Socket sinks |
+ at least once |
+ |
+
+
+ Standard output |
+ at least once |
+ |
+
+
+
+
+[Back to top](#top)
+
+
+Batch Processing Fault Tolerance (DataSet API)
+----------------------------------------------
+
+Fault tolerance for programs in the *DataSet API* works by retrying failed executions.
+The number of time that Flink retries the execution before the job is declared as failed is configurable
+via the *execution retries* parameter. A value of *0* effectively means that fault tolerance is deactivated.
+
+To activate the fault tolerance, set the *execution retries* to a value larger than zero. A common choice is a value
+of three.
+
+This example shows how to configure the execution retries for a Flink DataSet program.
+
+
+
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+env.setNumberOfExecutionRetries(3);
+{% endhighlight %}
+
+
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment()
+env.setNumberOfExecutionRetries(3)
+{% endhighlight %}
+
+
+
+
+You can also define default values for the number of execution retries and the retry delay in the `flink-conf.yaml`:
+
+~~~
+execution-retries.default: 3
+~~~
+
+
+Retry Delays
+------------
+
+Execution retries can be configured to be delayed. Delaying the retry means that after a failed execution, the re-execution does not start
+immediately, but only after a certain delay.
+
+Delaying the retries can be helpful when the program interacts with external systems where for example connections or pending transactions should reach a timeout before re-execution is attempted.
+
+You can set the retry delay for each program as follows (the sample shows the DataStream API - the DataSet API works similarly):
+
+
+
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.getConfig().setExecutionRetryDelay(5000); // 5000 milliseconds delay
+{% endhighlight %}
+
+
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.getConfig.setExecutionRetryDelay(5000) // 5000 milliseconds delay
+{% endhighlight %}
+
+
+
+You can also define the default value for the retry delay in the `flink-conf.yaml`:
+
+~~~
+execution-retries.delay: 10 s
+~~~
+
+[Back to top](#top)
+
+
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index 3bb597b4c26bc..366de221ef6e2 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -2889,136 +2889,7 @@ Execution Parameters
### Fault Tolerance
-Flink has a checkpointing mechanism that recovers streaming jobs after failues. The checkpointing mechanism requires a *persistent* or *durable* source that
-can be asked for prior records again (Apache Kafka is a good example of a durable source).
-
-The checkpointing mechanism stores the progress in the source as well as the user-defined state (see [Working with State](#working_with_state))
-consistently to provide *exactly once* processing guarantees.
-
-To enable checkpointing, call `enableCheckpointing(n)` on the `StreamExecutionEnvironment`, where *n* is the checkpoint interval in milliseconds.
-
-Other parameters for checkpointing include:
-
-- *Number of retries*: The `setNumberOfExecutionRerties()` method defines how many times the job is restarted after a failure.
- When checkpointing is activated, but this value is not explicitly set, the job is restarted infinitely often.
-- *exactly-once vs. at-least-once*: You can optionally pass a mode to the `enableCheckpointing(n)` method to choose between the two guarantee levels.
- Exactly-once is preferrable for most applications. At-least-once may be relevant for certain super-low-latency (consistently few milliseconds) applications.
-
-The [docs on streaming fault tolerance](../internals/stream_checkpointing.html) describe in detail the technique behind Flink's streaming fault tolerance mechanism.
-
-Flink can guarantee exactly-once state updates to user-defined state only when the source participates in the
-snapshotting mechanism. This is currently guaranteed for the Kafka source (and internal number generators), but
-not for other sources. The following table lists the state update guarantees of Flink coupled with the bundled sources:
-
-
-
-
- Source |
- Guarantees |
- Notes |
-
-
-
-
- Apache Kafka |
- exactly once |
- Use the appropriate Kafka connector for your version |
-
-
- RabbitMQ |
- at most once |
- |
-
-
- Twitter Streaming API |
- at most once |
- |
-
-
- Collections |
- at most once |
- |
-
-
- Files |
- at least once |
- At failure the file will be read from the beginning |
-
-
- Sockets |
- at most once |
- |
-
-
-
-
-
-
-To guarantee end-to-end exactly-once record delivery (in addition to exactly-once updates), the data sink needs
-to take part in the snapshotting mechanism. The following table lists the delivery guarantees (assuming exactly-once
-state updates) of Flink coupled with bundled sinks:
-
-
-
-
- Sink |
- Guarantees |
- Notes |
-
-
-
-
- HDFS rolling sink |
- exactly once |
- Implementation depends on Hadoop version |
-
-
- Elasticsearch |
- at least once |
- |
-
-
- Kafka producer |
- at least once |
- |
-
-
- File sinks |
- at least once |
- |
-
-
- Socket sinks |
- at lest once |
- |
-
-
- Standard output |
- at least once |
- |
-
-
-
-
-
-
+The [Fault Tolerance Documentation]({{ site.baseurl }}/apis/fault_tolerance.html) describes the options and parameters to enable and configure Flink's checkpointing mechanism.
### Parallelism