Skip to content

Commit

Permalink
[docs] [streaming] Added states and fold to the streaming docs
Browse files Browse the repository at this point in the history
  • Loading branch information
mbalassi committed Jun 8, 2015
1 parent fcca75c commit ee7c417
Showing 1 changed file with 134 additions and 5 deletions.
139 changes: 134 additions & 5 deletions docs/apis/streaming_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ under the License.

<a href="#top"></a>

Flink Streaming is a system for high-throughput, low-latency data stream processing. The system can connect to and process data streams from different data sources like file sources, web sockets, message queues (Apache Kafka, RabbitMQ, Twitter Streaming API …), and also from any user defined data source using a very simple interface. Data streams can be transformed and modified to create new data streams using high-level functions similar to the ones provided by the batch processing API. Flink Streaming natively supports flexible, data-driven windowing semantics and iterative stream processing. The processed data can be pushed to different output types.
Flink Streaming is a system for high-throughput, low-latency data stream processing. Flink Streaming natively supports [stateful computation](#stateful-computation), data-driven [windowing semantics](#window-operators) and [iterative](#iterations) stream processing. The system can connect to and process data streams from different data sources like file sources, web sockets, message queues (Apache Kafka, RabbitMQ, Twitter Streaming API …), and also from any user defined data sources. Data streams can be transformed and modified to create new data streams using high-level functions similar to the ones provided by the batch processing API.

* This will be replaced by the TOC
{:toc}
Expand Down Expand Up @@ -335,7 +335,31 @@ a cancel request. The source can communicate with the outside world using the so
example, the `emit(element)` method is used to emit one element from the source. Most sources will
have an infinite while loop inside the `run()` method to read from the input and emit elements.
Upon invocation of the `cancel()` method the source is required to break out of its internal
loop and return from the `run()` method.
loop and return from the `run()` method. A common implementation for this is the following:

{% highlight java %}
public static class MySource implements SourceFunction<Long> {

// utility for job cancellation
private volatile boolean isRunning = false;

@Override
public void run(SourceContext<Long> ctx) throws Exception {
isRunning = true;
while (isRunning) {
// the source runs, isRunning flag should be checked frequently
}
}
}

// invoked by the framework in case of job cancellation
@Override
public void cancel() {
isRunning = false;
}

}
{% endhighlight %}

In addition to the bounded data sources (with similar method signatures as the
[batch API](programming_guide.html#data-sources)) there are several predefined stream sources
Expand Down Expand Up @@ -471,6 +495,23 @@ dataStream.reduce(new ReduceFunction<Integer>() {
</td>
</tr>

<tr>
<td><strong>Fold</strong></td>
<td>
<p>Combines a stream element by element with an initial aggregator value. Fold may be applied on a full, windowed or grouped data stream.
<br/>
A folder that appends strings one by one to the empty sting:</p>
{% highlight java %}
dataStream.fold("", new FoldFunction<String, String>() {
@Override
public String fold(String accumulator, String value) throws Exception {
return accumulator + value;
}
});
{% endhighlight %}
</td>
</tr>

<tr>
<td><strong>Union</strong></td>
<td>
Expand Down Expand Up @@ -562,8 +603,8 @@ dataStream.filter{ _ != 0 }

<tr>
<td><strong>Reduce</strong></td>
<td>
<p>Combines a group of elements into a single element by repeatedly combining two elements
<td>
<p>Combines a stream of elements into another stream by repeatedly combining two elements
into one and emits the current state after every reduction. Reduce may be applied on a full, windowed or grouped data stream.
<br/>
Expand All @@ -572,7 +613,19 @@ dataStream.filter{ _ != 0 }
<br/>
A reducer that sums up the incoming stream, the result is a stream of intermediate sums:</p>
{% highlight scala %}
dataStream.reduce{ _ + _ }
dataStream.reduce{ _ + _}
{% endhighlight %}
</td>
</tr>

<tr>
<td><strong>Fold</strong></td>
<td>
<p>Combines a stream element by element with an initial aggregator value. Fold may be applied on a full, windowed or grouped data stream.
<br/>
A folder that appends strings one by one to the empty sting:</p>
{% highlight scala %}
dataStream.fold{"", _ + _ }
{% endhighlight %}
</td>
</tr>
Expand Down Expand Up @@ -1128,6 +1181,82 @@ Rich functions provide, in addition to the user-defined function (`map()`, `redu

[Back to top](#top)

Stateful computation
------------

Flink supports the checkpointing and persistence of user defined state, so in case of a failure this state can be restored to the latest checkpoint and the processing can continue from there. This gives exactly once semantics for anything that is sotred in the state. For example when implementing a rolling count over the stream Flink gives you the possibility to safely store the counter. Another common usacase is when reading from a Kafka source to save the latest committed offset to catch up from. To mark a source for checkpointing it has to implement the `flink.streaming.api.checkpoint.Checkpointed` interface or preferably its special case where the checkpointing can be done asynchronously, `CheckpointedAsynchronously`. For example let us write a reduce function that besides summing the data it also counts have many elements it has seen.

{% highlight java %}
public class CounterSum implements ReduceFunction<Long>, CheckpointedAsynchronously<Long> {

//persistent counter
private long counter = 0;

@Override
public Long reduce(Long value1, Long value2) throws Exception {
counter++;
return value1 + value2;
}

// regurarly persists state during normal operation
@Override
public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
return new Long(counter);
}

// restores state on recovery from failure
@Override
public void restoreState(Serializable state) {
counter = (Long) state;
}
}
{% endhighlight %}

Stateful sources require a bit more care as opposed to other operators they are not data driven, but their `run(SourceContext)` methods potentially run infinitely. In order to make the updates to the state and output collection atomic the user is required to get a lock from the source's context.

{% highlight java %}
public static class CounterSource implements SourceFunction<Long>, CheckpointedAsynchronously<Long> {

// utility for job cancellation
private volatile boolean isRunning = false;

private long counter;

@Override
public void run(SourceContext<Long> ctx) throws Exception {
isRunning = true;
Object lock = ctx.getCheckpointLock();
while (isRunning) {
// output and state update are atomic
synchronized (lock){
ctx.collect(counter);
counter++;
}
}
}

@Override
public void cancel() {
isRunning = false;
}

@Override
public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
return new Long(counter);
}

@Override
public void restoreState(Serializable state) {
counter = (Long) state;
}
}
{% endhighlight %}

Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `flink.streaming.api.checkpoint.CheckpointComitter` interface.

[Back to top](#top)

Lambda expressions with Java 8
------------

Expand Down

0 comments on commit ee7c417

Please sign in to comment.