diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/BaseTwoInputStreamOperatorWithStateRetention.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/BaseTwoInputStreamOperatorWithStateRetention.java index f7b36d2fcb0dd..62d28a77bae9d 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/BaseTwoInputStreamOperatorWithStateRetention.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/BaseTwoInputStreamOperatorWithStateRetention.java @@ -80,6 +80,11 @@ protected BaseTwoInputStreamOperatorWithStateRetention( this.stateCleaningEnabled = minRetentionTime > 1; } + @Override + public boolean useSplittableTimers() { + return true; + } + @Override public void open() throws Exception { initializeTimerService(); diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BaseTemporalSortOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BaseTemporalSortOperator.java index 355a246e9f4c8..a12e01036c46b 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BaseTemporalSortOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BaseTemporalSortOperator.java @@ -38,6 +38,11 @@ abstract class BaseTemporalSortOperator extends AbstractStreamOperator BaseTemporalSortOperator() {} + @Override + public boolean useSplittableTimers() { + return true; + } + @Override public void open() throws Exception { InternalTimerService internalTimerService = diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperator.java index 8e91037537487..8a2c04bf36bc3 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperator.java @@ -208,6 +208,11 @@ public abstract class WindowOperator extends AbstractStream this.recordCounter = RecordCounter.of(inputCountIndex); } + @Override + public boolean useSplittableTimers() { + return true; + } + WindowOperator( GroupWindowAssigner windowAssigner, Trigger trigger,