Skip to content

Commit

Permalink
[FLINK-14068][streaming] Removes non-deprecated API that uses org.apa…
Browse files Browse the repository at this point in the history
…che.flink.streaming.api.windowing.time.Time;
  • Loading branch information
XComp committed Sep 29, 2024
1 parent 74864b0 commit d2ac6bd
Show file tree
Hide file tree
Showing 89 changed files with 593 additions and 1,286 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ val text = env.socketTextStream(host, port, '\n')
val windowCounts = text.flatMap { w => w.split("\\s") }
.map { w => WordWithCount(w, 1) }
.keyBy("word")
.window(TumblingProcessingTimeWindow.of(Time.seconds(5)))
.window(TumblingProcessingTimeWindow.of(Duration.ofSeconds(5)))
.sum("count")

windowCounts.print()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ DataStream<MyEvent> withTimestampsAndWatermarks = stream

withTimestampsAndWatermarks
.keyBy( (event) -> event.getGroup() )
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(10)))
.reduce( (a, b) -> a.add(b) )
.addSink(...);
```
Expand All @@ -144,7 +144,7 @@ val withTimestampsAndWatermarks: DataStream[MyEvent] = stream

withTimestampsAndWatermarks
.keyBy( _.getGroup )
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(10)))
.reduce( (a, b) => a.add(b) )
.addSink(...)
```
Expand All @@ -164,7 +164,7 @@ with_timestamp_and_watermarks = stream \

with_timestamp_and_watermarks \
.key_by(lambda e: e.get_group()) \
.window(TumblingEventTimeWindows.of(Time.seconds(10))) \
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(10))) \
.reduce(lambda a, b: a.add(b)) \
.add_sink(...)
```
Expand Down
10 changes: 5 additions & 5 deletions docs/content.zh/docs/dev/datastream/execution/parallel.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ DataStream<String> text = [...];
DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.keyBy(value -> value.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(5)))
.sum(1).setParallelism(5);

wordCounts.print();
Expand All @@ -66,7 +66,7 @@ val text = [...]
val wordCounts = text
.flatMap{ _.split(" ") map { (_, 1) } }
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(5)))
.sum(1).setParallelism(5)
wordCounts.print()

Expand All @@ -82,7 +82,7 @@ word_counts = text
.flat_map(lambda x: x.split(" ")) \
.map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
.key_by(lambda i: i[0]) \
.window(TumblingEventTimeWindows.of(Time.seconds(5))) \
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(5))) \
.reduce(lambda i, j: (i[0], i[1] + j[1])) \
.set_parallelism(5)
word_counts.print()
Expand Down Expand Up @@ -121,7 +121,7 @@ val text = [...]
val wordCounts = text
.flatMap{ _.split(" ") map { (_, 1) } }
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(5)))
.sum(1)
wordCounts.print()

Expand All @@ -138,7 +138,7 @@ word_counts = text
.flat_map(lambda x: x.split(" ")) \
.map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
.key_by(lambda i: i[0]) \
.window(TumblingEventTimeWindows.of(Time.seconds(5))) \
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(5))) \
.reduce(lambda i, j: (i[0], i[1] + j[1]))
word_counts.print()

Expand Down
4 changes: 2 additions & 2 deletions docs/content.zh/docs/dev/datastream/experimental.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Code example:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> source = ...;
DataStreamUtils.reinterpretAsKeyedStream(source, (in) -> in, TypeInformation.of(Integer.class))
.window(TumblingEventTimeWindows.of(Time.seconds(1)))
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(1)))
.reduce((a, b) -> a + b)
.addSink(new DiscardingSink<>());
env.execute();
Expand All @@ -76,7 +76,7 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val source = ...
new DataStreamUtils(source).reinterpretAsKeyedStream((in) => in)
.window(TumblingEventTimeWindows.of(Time.seconds(1)))
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(1)))
.reduce((a, b) => a + b)
.addSink(new DiscardingSink[Int])
env.execute()
Expand Down
32 changes: 16 additions & 16 deletions docs/content.zh/docs/dev/datastream/operators/joining.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ stream.join(otherStream)
```java
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;

...

Expand All @@ -69,7 +69,7 @@ DataStream<Integer> greenStream = ...;
orangeStream.join(greenStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
.window(TumblingEventTimeWindows.of(Duration.ofMillis(2)))
.apply (new JoinFunction<Integer, Integer, String> (){
@Override
public String join(Integer first, Integer second) {
Expand All @@ -82,7 +82,7 @@ orangeStream.join(greenStream)

```scala
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import java.time.Duration

...

Expand All @@ -92,7 +92,7 @@ val greenStream: DataStream[Integer] = ...
orangeStream.join(greenStream)
.where(elem => /* select key */)
.equalTo(elem => /* select key */)
.window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
.window(TumblingEventTimeWindows.of(Duration.ofMillis(2)))
.apply { (e1, e2) => e1 + "," + e2 }
```

Expand All @@ -114,7 +114,7 @@ X 轴下方是每个滑动窗口中被 join 后传递给 `JoinFunction` 的元
```java
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;

...

Expand All @@ -124,7 +124,7 @@ DataStream<Integer> greenStream = ...;
orangeStream.join(greenStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
.window(SlidingEventTimeWindows.of(Duration.ofMillis(2) /* size */, Duration.ofMillis(1) /* slide */))
.apply (new JoinFunction<Integer, Integer, String> (){
@Override
public String join(Integer first, Integer second) {
Expand All @@ -137,7 +137,7 @@ orangeStream.join(greenStream)

```scala
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import java.time.Duration

...

Expand All @@ -147,7 +147,7 @@ val greenStream: DataStream[Integer] = ...
orangeStream.join(greenStream)
.where(elem => /* select key */)
.equalTo(elem => /* select key */)
.window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
.window(SlidingEventTimeWindows.of(Duration.ofMillis(2) /* size */, Duration.ofMillis(1) /* slide */))
.apply { (e1, e2) => e1 + "," + e2 }
```
{{< /tab >}}
Expand All @@ -167,7 +167,7 @@ orangeStream.join(greenStream)
```java
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;

...

Expand All @@ -177,7 +177,7 @@ DataStream<Integer> greenStream = ...;
orangeStream.join(greenStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
.window(EventTimeSessionWindows.withGap(Duration.ofMillis(1)))
.apply (new JoinFunction<Integer, Integer, String> (){
@Override
public String join(Integer first, Integer second) {
Expand All @@ -190,7 +190,7 @@ orangeStream.join(greenStream)

```scala
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows
import org.apache.flink.streaming.api.windowing.time.Time
import java.time.Duration

...

Expand All @@ -200,7 +200,7 @@ val greenStream: DataStream[Integer] = ...
orangeStream.join(greenStream)
.where(elem => /* select key */)
.equalTo(elem => /* select key */)
.window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
.window(EventTimeSessionWindows.withGap(Duration.ofMillis(1)))
.apply { (e1, e2) => e1 + "," + e2 }
```

Expand Down Expand Up @@ -240,7 +240,7 @@ Interval join 目前仅支持 event time。
```java
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;

...

Expand All @@ -250,7 +250,7 @@ DataStream<Integer> greenStream = ...;
orangeStream
.keyBy(<KeySelector>)
.intervalJoin(greenStream.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.between(Duration.ofMillis(-2), Duration.ofMillis(1))
.process (new ProcessJoinFunction<Integer, Integer, String>(){

@Override
Expand All @@ -265,7 +265,7 @@ orangeStream

```scala
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.windowing.time.Time
import java.time.Duration

...

Expand All @@ -275,7 +275,7 @@ val greenStream: DataStream[Integer] = ...
orangeStream
.keyBy(elem => /* select key */)
.intervalJoin(greenStream.keyBy(elem => /* select key */))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.between(Duration.ofMillis(-2), Duration.ofMillis(1))
.process(new ProcessJoinFunction[Integer, Integer, String] {
override def processElement(left: Integer, right: Integer, ctx: ProcessJoinFunction[Integer, Integer, String]#Context, out: Collector[String]): Unit = {
out.collect(left + "," + right)
Expand Down
24 changes: 12 additions & 12 deletions docs/content.zh/docs/dev/datastream/operators/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,19 +198,19 @@ data_stream.key_by(lambda x: x[1]).reduce(lambda a, b: (a[0] + b[0], b[1]))
```java
dataStream
.keyBy(value -> value.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)));
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(5)));
```
{{< /tab >}}
{{< tab "Scala">}}
```scala
dataStream
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(5)))
```
{{< /tab >}}
{{< tab "Python" >}}
```python
data_stream.key_by(lambda x: x[1]).window(TumblingEventTimeWindows.of(Time.seconds(5)))
data_stream.key_by(lambda x: x[1]).window(TumblingEventTimeWindows.of(Duration.ofSeconds(5)))
```
{{< /tab >}}
{{< /tabs>}}
Expand All @@ -228,18 +228,18 @@ data_stream.key_by(lambda x: x[1]).window(TumblingEventTimeWindows.of(Time.secon
{{< tab "Java">}}
```java
dataStream
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));
.windowAll(TumblingEventTimeWindows.of(Duration.ofSeconds(5)));
```
{{< /tab >}}
{{< tab "Scala">}}
```scala
dataStream
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.windowAll(TumblingEventTimeWindows.of(Duration.ofSeconds(5)))
```
{{< /tab >}}
{{< tab "Python" >}}
```python
data_stream.window_all(TumblingEventTimeWindows.of(Time.seconds(5)))
data_stream.window_all(TumblingEventTimeWindows.of(Duration.ofSeconds(5)))
```
{{< /tab >}}
{{< /tabs>}}
Expand Down Expand Up @@ -386,15 +386,15 @@ data_stream.union(otherStream1, otherStream2, ...)
```java
dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(3)))
.apply (new JoinFunction () {...});
```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(3)))
.apply { ... }
```
{{< /tab >}}
Expand All @@ -414,7 +414,7 @@ Python 中尚不支持此特性。
// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
keyedStream.intervalJoin(otherKeyedStream)
.between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
.between(Duration.ofMillis(-2), Duration.ofMillis(2)) // lower and upper bound
.upperBoundExclusive(true) // optional
.lowerBoundExclusive(true) // optional
.process(new IntervalJoinFunction() {...});
Expand All @@ -425,7 +425,7 @@ keyedStream.intervalJoin(otherKeyedStream)
// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
keyedStream.intervalJoin(otherKeyedStream)
.between(Time.milliseconds(-2), Time.milliseconds(2))
.between(Duration.ofMillis(-2), Duration.ofMillis(2))
// lower and upper bound
.upperBoundExclusive(true) // optional
.lowerBoundExclusive(true) // optional
Expand All @@ -447,15 +447,15 @@ Python 中尚不支持此特性。
```java
dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(3)))
.apply (new CoGroupFunction () {...});
```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(3)))
.apply {}
```
{{< /tab >}}
Expand Down
Loading

0 comments on commit d2ac6bd

Please sign in to comment.