diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md index 40cbbbcce13fe..c365dc3235194 100644 --- a/docs/apis/streaming_guide.md +++ b/docs/apis/streaming_guide.md @@ -23,7 +23,7 @@ under the License. -Flink Streaming is a system for high-throughput, low-latency data stream processing. The system can connect to and process data streams from different data sources like file sources, web sockets, message queues (Apache Kafka, RabbitMQ, Twitter Streaming API …), and also from any user defined data source using a very simple interface. Data streams can be transformed and modified to create new data streams using high-level functions similar to the ones provided by the batch processing API. Flink Streaming natively supports flexible, data-driven windowing semantics and iterative stream processing. The processed data can be pushed to different output types. +Flink Streaming is a system for high-throughput, low-latency data stream processing. Flink Streaming natively supports [stateful computation](#stateful-computation), data-driven [windowing semantics](#window-operators) and [iterative](#iterations) stream processing. The system can connect to and process data streams from different data sources like file sources, web sockets, message queues (Apache Kafka, RabbitMQ, Twitter Streaming API …), and also from any user defined data sources. Data streams can be transformed and modified to create new data streams using high-level functions similar to the ones provided by the batch processing API. * This will be replaced by the TOC {:toc} @@ -335,7 +335,31 @@ a cancel request. The source can communicate with the outside world using the so example, the `emit(element)` method is used to emit one element from the source. Most sources will have an infinite while loop inside the `run()` method to read from the input and emit elements. Upon invocation of the `cancel()` method the source is required to break out of its internal -loop and return from the `run()` method. +loop and return from the `run()` method. A common implementation for this is the following: + +{% highlight java %} +public static class MySource implements SourceFunction { + + // utility for job cancellation + private volatile boolean isRunning = false; + + @Override + public void run(SourceContext ctx) throws Exception { + isRunning = true; + while (isRunning) { + // the source runs, isRunning flag should be checked frequently + } + } + } + + // invoked by the framework in case of job cancellation + @Override + public void cancel() { + isRunning = false; + } + +} +{% endhighlight %} In addition to the bounded data sources (with similar method signatures as the [batch API](programming_guide.html#data-sources)) there are several predefined stream sources @@ -471,6 +495,23 @@ dataStream.reduce(new ReduceFunction() { + + Fold + +

Combines a stream element by element with an initial aggregator value. Fold may be applied on a full, windowed or grouped data stream. +
+ A folder that appends strings one by one to the empty sting:

+{% highlight java %} +dataStream.fold("", new FoldFunction() { + @Override + public String fold(String accumulator, String value) throws Exception { + return accumulator + value; + } + }); +{% endhighlight %} + + + Union @@ -562,8 +603,8 @@ dataStream.filter{ _ != 0 } Reduce - -

Combines a group of elements into a single element by repeatedly combining two elements + +

Combines a stream of elements into another stream by repeatedly combining two elements into one and emits the current state after every reduction. Reduce may be applied on a full, windowed or grouped data stream.
@@ -572,7 +613,19 @@ dataStream.filter{ _ != 0 }
A reducer that sums up the incoming stream, the result is a stream of intermediate sums:

{% highlight scala %} -dataStream.reduce{ _ + _ } +dataStream.reduce{ _ + _} +{% endhighlight %} + + + + + Fold + +

Combines a stream element by element with an initial aggregator value. Fold may be applied on a full, windowed or grouped data stream. +
+ A folder that appends strings one by one to the empty sting:

+{% highlight scala %} +dataStream.fold{"", _ + _ } {% endhighlight %} @@ -1128,6 +1181,82 @@ Rich functions provide, in addition to the user-defined function (`map()`, `redu [Back to top](#top) +Stateful computation +------------ + +Flink supports the checkpointing and persistence of user defined state, so in case of a failure this state can be restored to the latest checkpoint and the processing can continue from there. This gives exactly once semantics for anything that is sotred in the state. For example when implementing a rolling count over the stream Flink gives you the possibility to safely store the counter. Another common usacase is when reading from a Kafka source to save the latest committed offset to catch up from. To mark a source for checkpointing it has to implement the `flink.streaming.api.checkpoint.Checkpointed` interface or preferably its special case where the checkpointing can be done asynchronously, `CheckpointedAsynchronously`. For example let us write a reduce function that besides summing the data it also counts have many elements it has seen. + +{% highlight java %} +public class CounterSum implements ReduceFunction, CheckpointedAsynchronously { + + //persistent counter + private long counter = 0; + + @Override + public Long reduce(Long value1, Long value2) throws Exception { + counter++; + return value1 + value2; + } + + // regurarly persists state during normal operation + @Override + public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return new Long(counter); + } + + // restores state on recovery from failure + @Override + public void restoreState(Serializable state) { + counter = (Long) state; + } +} +{% endhighlight %} + +Stateful sources require a bit more care as opposed to other operators they are not data driven, but their `run(SourceContext)` methods potentially run infinitely. In order to make the updates to the state and output collection atomic the user is required to get a lock from the source's context. + +{% highlight java %} +public static class CounterSource implements SourceFunction, CheckpointedAsynchronously { + + // utility for job cancellation + private volatile boolean isRunning = false; + + private long counter; + + @Override + public void run(SourceContext ctx) throws Exception { + isRunning = true; + Object lock = ctx.getCheckpointLock(); + + while (isRunning) { + // output and state update are atomic + synchronized (lock){ + ctx.collect(counter); + counter++; + } + } + } + + @Override + public void cancel() { + isRunning = false; + } + + @Override + public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return new Long(counter); + } + + @Override + public void restoreState(Serializable state) { + counter = (Long) state; + } +} +{% endhighlight %} + +Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `flink.streaming.api.checkpoint.CheckpointComitter` interface. + +[Back to top](#top) + Lambda expressions with Java 8 ------------