Skip to content

Commit

Permalink
[FLINK-11817][docs] Replace fold example in DataStream API Tutorial
Browse files Browse the repository at this point in the history
  • Loading branch information
leesf authored and zentol committed Mar 20, 2019
1 parent 4bd2db2 commit 96fd4c5
Showing 1 changed file with 46 additions and 16 deletions.
62 changes: 46 additions & 16 deletions docs/tutorials/datastream_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,18 +178,33 @@ that we want to aggregate the sum of edited bytes for every five seconds:
{% highlight java %}
DataStream<Tuple2<String, Long>> result = keyedEdits
.timeWindow(Time.seconds(5))
.fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
.aggregate(new AggregateFunction<WikipediaEditEvent, Tuple2<String, Long>, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
acc.f0 = event.getUser();
acc.f1 += event.getByteDiff();
return acc;
public Tuple2<String, Long> createAccumulator() {
return new Tuple2<>("", 0L);
}

@Override
public Tuple2<String, Long> add(WikipediaEditEvent value, Tuple2<String, Long> accumulator) {
accumulator.f0 = value.getUser();
accumulator.f1 += value.getByteDiff();
return accumulator;
}

@Override
public Tuple2<String, Long> getResult(Tuple2<String, Long> accumulator) {
return accumulator;
}

@Override
public Tuple2<String, Long> merge(Tuple2<String, Long> a, Tuple2<String, Long> b) {
return new Tuple2<>(a.f0, a.f1 + b.f1);
}
});
{% endhighlight %}

The first call, `.timeWindow()`, specifies that we want to have tumbling (non-overlapping) windows
of five seconds. The second call specifies a *Fold transformation* on each window slice for
of five seconds. The second call specifies a *Aggregate transformation* on each window slice for
each unique key. In our case we start from an initial value of `("", 0L)` and add to it the byte
difference of every edit in that time window for a user. The resulting Stream now contains
a `Tuple2<String, Long>` for every user which gets emitted every five seconds.
Expand All @@ -212,7 +227,7 @@ The complete code so far is this:
{% highlight java %}
package wikiedits;

import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
Expand Down Expand Up @@ -240,13 +255,28 @@ public class WikipediaAnalysis {

DataStream<Tuple2<String, Long>> result = keyedEdits
.timeWindow(Time.seconds(5))
.fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
.aggregate(new AggregateFunction<WikipediaEditEvent, Tuple2<String, Long>, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
acc.f0 = event.getUser();
acc.f1 += event.getByteDiff();
return acc;
}
public Tuple2<String, Long> createAccumulator() {
return new Tuple2<>("", 0L);
}

@Override
public Tuple2<String, Long> add(WikipediaEditEvent value, Tuple2<String, Long> accumulator) {
accumulator.f0 = value.getUser();
accumulator.f1 += value.getByteDiff();
return accumulator;
}

@Override
public Tuple2<String, Long> getResult(Tuple2<String, Long> accumulator) {
return accumulator;
}

@Override
public Tuple2<String, Long> merge(Tuple2<String, Long> a, Tuple2<String, Long> b) {
return new Tuple2<>(a.f0, a.f1 + b.f1);
}
});

result.print();
Expand Down Expand Up @@ -368,9 +398,9 @@ The output of that command should look similar to this, if everything went accor
03/08/2016 15:09:27 Job execution switched to status RUNNING.
03/08/2016 15:09:27 Source: Custom Source(1/1) switched to SCHEDULED
03/08/2016 15:09:27 Source: Custom Source(1/1) switched to DEPLOYING
03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to SCHEDULED
03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to DEPLOYING
03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to RUNNING
03/08/2016 15:09:27 Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, AggregateFunction$3, PassThroughWindowFunction) -> Sink: Print to Std. Out (1/1) switched from CREATED to SCHEDULED
03/08/2016 15:09:27 Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, AggregateFunction$3, PassThroughWindowFunction) -> Sink: Print to Std. Out (1/1) switched from SCHEDULED to DEPLOYING
03/08/2016 15:09:27 Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, AggregateFunction$3, PassThroughWindowFunction) -> Sink: Print to Std. Out (1/1) switched from DEPLOYING to RUNNING
03/08/2016 15:09:27 Source: Custom Source(1/1) switched to RUNNING
{% endhighlight %}

Expand Down

0 comments on commit 96fd4c5

Please sign in to comment.