Skip to content

Commit

Permalink
[streaming] [docs] Updated streaming guide for new state interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
gyfora committed Jun 25, 2015
1 parent 56ae08e commit 0a4144e
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 35 deletions.
56 changes: 28 additions & 28 deletions docs/apis/streaming_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1188,51 +1188,56 @@ Rich functions provide, in addition to the user-defined function (`map()`, `redu
Stateful computation
------------

Flink supports the checkpointing and persistence of user defined state, so in case of a failure this state can be restored to the latest checkpoint and the processing can continue from there. This gives exactly once semantics for anything that is stored in the state when the sources are stateful as well and checkpoint their current offset. The `PersistentKafkaSource` provides this stateful functionality for example.
Flink supports the checkpointing and persistence of user defined operator state, so in case of a failure this state can be restored to the latest checkpoint and the processing will continue from there. This gives exactly once processing semantics with respect to the operator states when the sources follow this stateful pattern as well. In practice this usually means that sources keep track of their current offset as their OperatorState. The `PersistentKafkaSource` provides this stateful functionality for reading streams from Kafka.

For example when implementing a rolling count over the stream Flink gives you the possibility to safely store the counter. Another common usecase is when reading from a Kafka source to save the latest committed offset to catch up from. To mark a function for checkpointing it has to implement the `flink.streaming.api.checkpoint.Checkpointed` interface or preferably its special case where the checkpointing can be done asynchronously, `CheckpointedAsynchronously`.
Flink supports two ways of accessing operator states: partitioned and non-partitioned state access.
In case of non-partitioned state access, an operator state is maintained for each parallel instance of a given operator. When `OperatorState.getState()` is called a separate state is returned in each parallel instance. In practice this means if we keep a counter for the received inputs in a mapper, `getState()` will return number of inputs processed by each parallel mapper.

Checkpointing can be enabled from the `StreamExecutionEnvironment` using the `enableCheckpointing(…)` where additional parameters can be passed to modify the default 5 second checkpoint interval.
In case of partitioned state access the user needs to define a `KeyExtractor` which will assign a key to each input of the stateful operator:

`stream.map(counter).setStatePartitioner(…)`

A separate `OperatorState` is maintained for each received key which can be used for instance to count received inputs by different keys, or store and update summary statistics of different sub-streams.

Checkpointing of the states needs to be enabled from the `StreamExecutionEnvironment` using the `enableCheckpointing(…)` where additional parameters can be passed to modify the default 5 second checkpoint interval.

Operators can be accessed from the `RuntimeContext` using the `getOperatorState(“name”, defaultValue)` method so it is only accessible in `RichFunction`s. A recommended usage pattern is to retrieve the operator state in the `open(…)` method of the operator and set it as a field in the operator instance for runtime usage. Multiple `OperatorState`s can be used simultaneously by the same operator by using different names to identify them.

By default operator states are checkpointed using default java serialization thus they need to be `Serializable`. The user can gain more control over the state checkpoint mechanism by passing a `StateCheckpointer` instance when retrieving the `OperatorState` from the `RuntimeContext`. The `StateCheckpointer` allows custom implementations for the checkpointing logic for increased efficiency and to store arbitrary non-serializable states.

By default state checkpoints will be stored in-memory at the JobManager. Flink also supports storing the checkpoints on any flink-supported file system (such as HDFS or Tachyon) which can be set in the flink-conf.yaml. Note that the state backend must be accessible from the JobManager, use `file://` only for local setups.

For example let us write a reduce function that besides summing the data it also counts have many elements it has seen.

{% highlight java %}
public class CounterSum implements ReduceFunction<Long>, CheckpointedAsynchronously<Long> {
public class CounterSum implements RichReduceFunction<Long> {

//persistent counter
private long counter = 0;
private OperatorState<Long> counter;

@Override
public Long reduce(Long value1, Long value2) throws Exception {
counter++;
counter.updateState(counter.getState() + 1);
return value1 + value2;
}

// regularly persists state during normal operation
@Override
public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
return new Long(counter);
}

// restores state on recovery from failure
@Override
public void restoreState(Serializable state) {
counter = (Long) state;
public void open(Configuration config) {
counter = getRuntimeContext().getOperatorState(“counter”, 0L);
}
}
{% endhighlight %}

Stateful sources require a bit more care as opposed to other operators they are not data driven, but their `run(SourceContext)` methods potentially run infinitely. In order to make the updates to the state and output collection atomic the user is required to get a lock from the source's context.

{% highlight java %}
public static class CounterSource implements SourceFunction<Long>, CheckpointedAsynchronously<Long> {
public static class CounterSource implements RichParallelSourceFunction<Long> {

// utility for job cancellation
private volatile boolean isRunning = false;

private long counter;
// maintain the current offset for exactly once semantics
private OperatorState<Long> offset;

@Override
public void run(SourceContext<Long> ctx) throws Exception {
Expand All @@ -1242,25 +1247,20 @@ public static class CounterSource implements SourceFunction<Long>, CheckpointedA
while (isRunning) {
// output and state update are atomic
synchronized (lock){
ctx.collect(counter);
counter++;
ctx.collect(offset);
offset.updateState(offset.getState() + 1);
}
}
}

@Override
public void cancel() {
isRunning = false;
public void open(Configuration config) {
offset = getRuntimeContext().getOperatorState(“offset”, 0L);
}

@Override
public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
return new Long(counter);
}

@Override
public void restoreState(Serializable state) {
counter = (Long) state;
public void cancel() {
isRunning = false;
}
}
{% endhighlight %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
* non-partitioned user states.
*
* State can be accessed and manipulated using the {@link #getState()} and
* {@link #updateState(T)} methods. These calls are only valid in the
* {@link #updateState(T)} methods. These calls are only safe in the
* transformation call the operator represents, for instance inside
* {@link MapFunction#map()} and invalid in
* {@link MapFunction#map()} and can lead tp unexpected behavior in the
* {@link #open(org.apache.flink.configuration.Configuration)} or
* {@link #close()} methods.
*
Expand All @@ -39,10 +39,10 @@ public interface OperatorState<T> {

/**
* Gets the current state for the operator. When the state is not
* partitioned the returned state is the same for all inputs. If state
* partitioning is applied the state returned depends on the current
* operator input, as the operator maintains an independent state for each
* partition.
* partitioned the returned state is the same for all inputs in a given
* operator instance. If state partitioning is applied, the state returned
* depends on the current operator input, as the operator maintains an
* independent state for each partition.
*
* @return The operator state corresponding to the current input.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public <S extends Serializable> OperatorState<S> getOperatorState(String name, S
}

/**
* Creates an empty state depending on the partitioning state.
* Creates an empty {@link OperatorState}.
*
* @return An empty operator state.
*/
Expand Down

0 comments on commit 0a4144e

Please sign in to comment.