Skip to content

Commit

Permalink
[FLINK-36416][table][runtime] Enable splittable timers for temporal j…
Browse files Browse the repository at this point in the history
…oin, temporal sort and windowed aggregations

This is a follow up for https://cwiki.apache.org/confluence/display/FLINK/FLIP-443%3A+Interruptible+timers+firing . Temporal join, temporal sort and both windowed and table windowed aggregations in Table API/SQL can have large amount of registered/fired time, while at the same time enabling splittable timers shouldn't cause any side effects for those operators.
  • Loading branch information
pnowojski committed Oct 2, 2024
1 parent f9dfa18 commit 00a94ad
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ protected BaseTwoInputStreamOperatorWithStateRetention(
this.stateCleaningEnabled = minRetentionTime > 1;
}

@Override
public boolean useSplittableTimers() {
return true;
}

@Override
public void open() throws Exception {
initializeTimerService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ abstract class BaseTemporalSortOperator extends AbstractStreamOperator<RowData>

BaseTemporalSortOperator() {}

@Override
public boolean useSplittableTimers() {
return true;
}

@Override
public void open() throws Exception {
InternalTimerService<VoidNamespace> internalTimerService =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ public abstract class WindowOperator<K, W extends Window> extends AbstractStream
this.recordCounter = RecordCounter.of(inputCountIndex);
}

@Override
public boolean useSplittableTimers() {
return true;
}

WindowOperator(
GroupWindowAssigner<W> windowAssigner,
Trigger<W> trigger,
Expand Down

0 comments on commit 00a94ad

Please sign in to comment.