Skip to content

Commit

Permalink
[FLINK-36355][runtime] Remove deprecated TimestampAssigner
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub authored and reswqa committed Oct 2, 2024
1 parent b2d7a90 commit a75df87
Show file tree
Hide file tree
Showing 25 changed files with 141 additions and 338 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.tests;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
Expand Down Expand Up @@ -429,8 +430,7 @@ static SourceFunction<Event> createEventSource(ParameterTool pt) {
SEQUENCE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.defaultValue()));
}

static BoundedOutOfOrdernessTimestampExtractor<Event> createTimestampExtractor(
ParameterTool pt) {
static WatermarkStrategy<Event> createWatermarkStrategy(ParameterTool pt) {
return new BoundedOutOfOrdernessTimestampExtractor<Event>(
Duration.ofMillis(
pt.getLong(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper;
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSlidingWindow;
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSlidingWindowCheckMapper;
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor;
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createWatermarkStrategy;
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.isSimulateFailures;
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment;
import static org.apache.flink.streaming.tests.TestOperatorEnum.EVENT_SOURCE;
Expand Down Expand Up @@ -94,7 +94,7 @@ public static void main(String[] args) throws Exception {
env.addSource(createEventSource(pt))
.name(EVENT_SOURCE.getName())
.uid(EVENT_SOURCE.getUid())
.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
.assignTimestampsAndWatermarks(createWatermarkStrategy(pt))
.keyBy(Event::getKey)
.map(
createArtificialKeyedStateMapper(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.flink.util.ParameterTool;

import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.applyTumblingWindows;
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor;
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createWatermarkStrategy;
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment;
import static org.apache.flink.streaming.tests.TestOperatorEnum.EVENT_SOURCE;
import static org.apache.flink.streaming.tests.TestOperatorEnum.TIME_WINDOW_OPER;
Expand All @@ -61,7 +61,7 @@ public static void main(String[] args) throws Exception {
env.addSource(DataStreamAllroundTestJobFactory.createEventSource(pt))
.name(EVENT_SOURCE.getName())
.uid(EVENT_SOURCE.getUid())
.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
.assignTimestampsAndWatermarks(createWatermarkStrategy(pt))
.keyBy(Event::getKey);

keyedStream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper;
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createEventSource;
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper;
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor;
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createWatermarkStrategy;
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment;

/**
Expand Down Expand Up @@ -94,7 +94,7 @@ private static void executeOriginalVariant(StreamExecutionEnvironment env, Param
env.addSource(createEventSource(pt))
.name("EventSource")
.uid("EventSource")
.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
.assignTimestampsAndWatermarks(createWatermarkStrategy(pt))
.keyBy(Event::getKey);

List<TypeSerializer<ComplexPayload>> stateSer =
Expand All @@ -119,7 +119,7 @@ private static void executeUpgradedVariant(StreamExecutionEnvironment env, Param
env.addSource(createEventSource(pt))
.name("EventSource")
.uid("EventSource")
.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
.assignTimestampsAndWatermarks(createWatermarkStrategy(pt))
.map(new UpgradeEvent())
.keyBy(UpgradedEvent::getKey);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.util.WatermarkStrategyWithPunctuatedWatermarks;
import org.apache.flink.test.util.AbstractTestBaseJUnit4;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Collector;
Expand Down Expand Up @@ -219,7 +219,8 @@ public void testSimplePatternEventTime() throws Exception {
// last element for high final watermark
Tuple2.of(new Event(5, "middle", 5.0), 100L))
.assignTimestampsAndWatermarks(
new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() {
new WatermarkStrategyWithPunctuatedWatermarks<
Tuple2<Event, Long>>() {

@Override
public long extractTimestamp(
Expand Down Expand Up @@ -300,7 +301,8 @@ public void testSimpleKeyedPatternEventTime() throws Exception {
Tuple2.of(new Event(3, "middle", 6.0), 9L),
Tuple2.of(new Event(3, "end", 7.0), 7L))
.assignTimestampsAndWatermarks(
new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() {
new WatermarkStrategyWithPunctuatedWatermarks<
Tuple2<Event, Long>>() {

@Override
public long extractTimestamp(
Expand Down Expand Up @@ -458,7 +460,8 @@ public void testTimeoutHandlingWithinFirstAndLast() throws Exception {
Tuple2.of(new Event(1, "start", 2.0), 4L),
Tuple2.of(new Event(1, "end", 2.0), 6L))
.assignTimestampsAndWatermarks(
new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() {
new WatermarkStrategyWithPunctuatedWatermarks<
Tuple2<Event, Long>>() {

@Override
public long extractTimestamp(
Expand Down Expand Up @@ -551,7 +554,8 @@ public void testTimeoutHandlingWithinPreviousAndCurrent() throws Exception {
Tuple2.of(new Event(1, "start", 2.0), 4L),
Tuple2.of(new Event(1, "end", 2.0), 6L))
.assignTimestampsAndWatermarks(
new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() {
new WatermarkStrategyWithPunctuatedWatermarks<
Tuple2<Event, Long>>() {

@Override
public long extractTimestamp(
Expand Down Expand Up @@ -714,7 +718,8 @@ public void testSimplePatternEventTimeWithComparator() throws Exception {
// last element for high final watermark
Tuple2.of(new Event(7, "middle", 5.0), 100L))
.assignTimestampsAndWatermarks(
new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() {
new WatermarkStrategyWithPunctuatedWatermarks<
Tuple2<Event, Long>>() {

@Override
public long extractTimestamp(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@
import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.OutputFormatSinkFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.PrintSinkFunction;
Expand Down Expand Up @@ -80,8 +78,6 @@
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWatermarksAdapter;
import org.apache.flink.streaming.runtime.operators.util.AssignerWithPunctuatedWatermarksAdapter;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
Expand Down Expand Up @@ -672,52 +668,6 @@ public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
return new SingleOutputStreamOperator<>(getExecutionEnvironment(), transformation);
}

/**
* Assigns timestamps to the elements in the data stream and periodically creates watermarks to
* signal event time progress.
*
* <p>This method uses the deprecated watermark generator interfaces. Please switch to {@link
* #assignTimestampsAndWatermarks(WatermarkStrategy)} to use the new interfaces instead. The new
* interfaces support watermark idleness and no longer need to differentiate between "periodic"
* and "punctuated" watermarks.
*
* @deprecated Please use {@link #assignTimestampsAndWatermarks(WatermarkStrategy)} instead.
*/
@Deprecated
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {

final AssignerWithPeriodicWatermarks<T> cleanedAssigner =
clean(timestampAndWatermarkAssigner);
final WatermarkStrategy<T> wms =
new AssignerWithPeriodicWatermarksAdapter.Strategy<>(cleanedAssigner);

return assignTimestampsAndWatermarks(wms);
}

/**
* Assigns timestamps to the elements in the data stream and creates watermarks based on events,
* to signal event time progress.
*
* <p>This method uses the deprecated watermark generator interfaces. Please switch to {@link
* #assignTimestampsAndWatermarks(WatermarkStrategy)} to use the new interfaces instead. The new
* interfaces support watermark idleness and no longer need to differentiate between "periodic"
* and "punctuated" watermarks.
*
* @deprecated Please use {@link #assignTimestampsAndWatermarks(WatermarkStrategy)} instead.
*/
@Deprecated
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
AssignerWithPunctuatedWatermarks<T> timestampAndWatermarkAssigner) {

final AssignerWithPunctuatedWatermarks<T> cleanedAssigner =
clean(timestampAndWatermarkAssigner);
final WatermarkStrategy<T> wms =
new AssignerWithPunctuatedWatermarksAdapter.Strategy<>(cleanedAssigner);

return assignTimestampsAndWatermarks(wms);
}

// ------------------------------------------------------------------------
// Data sinks
// ------------------------------------------------------------------------
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ public Collection<TimeWindow> assignWindows(
} else {
throw new RuntimeException(
"Record has Long.MIN_VALUE timestamp (= no timestamp marker). "
+ "Is the time characteristic set to 'ProcessingTime', or did you forget to call "
+ "'DataStream.assignTimestampsAndWatermarks(...)'?");
+ "Did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ public Collection<TimeWindow> assignWindows(
} else {
throw new RuntimeException(
"Record has Long.MIN_VALUE timestamp (= no timestamp marker). "
+ "Is the time characteristic set to 'ProcessingTime', or did you forget to call "
+ "'DataStream.assignTimestampsAndWatermarks(...)'?");
+ "Did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}

Expand Down

This file was deleted.

Loading

0 comments on commit a75df87

Please sign in to comment.