Skip to content

Commit

Permalink
[FLINK-34100][table] Add function getDescription for internal interfa…
Browse files Browse the repository at this point in the history
…ce WindowAssigner

This closes apache#24162
  • Loading branch information
xuyangzhong authored and lsyldliu committed Jan 25, 2024
1 parent 78e31f0 commit 65f0228
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.runtime.operators.window.tvf.common;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner;

import java.io.Serializable;
Expand All @@ -31,11 +32,15 @@
*
* <p>See more details in {@link WindowOperatorBase}.
*/
@Internal
public interface WindowAssigner extends Serializable {

/**
* Returns {@code true} if elements are assigned to windows based on event time, {@code false}
* based on processing time.
*/
boolean isEventTime();

/** Returns a description of this window assigner. */
String getDescription();
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ public Iterable<Long> expiredSlices(long windowEnd) {
public long getSliceEndInterval() {
return size;
}

@Override
public String getDescription() {
return String.format("TumblingWindow(size=%dms, offset=%dms)", size, offset);
}
}

/** The {@link SliceAssigner} for hopping windows. */
Expand Down Expand Up @@ -278,6 +283,12 @@ public Optional<Long> nextTriggerWindow(long windowEnd, Supplier<Boolean> isWind
return Optional.of(windowEnd + sliceSize);
}
}

@Override
public String getDescription() {
return String.format(
"HoppingWindow(size=%dms, slide=%dms, offset=%dms)", size, slide, offset);
}
}

/** The {@link SliceAssigner} for cumulative windows. */
Expand Down Expand Up @@ -384,6 +395,13 @@ public Optional<Long> nextTriggerWindow(long windowEnd, Supplier<Boolean> isWind
return Optional.of(nextWindowEnd);
}
}

@Override
public String getDescription() {
return String.format(
"CumulativeWindow(maxSize=%dms, step=%dms, offset=%dms)",
maxSize, step, offset);
}
}

/**
Expand Down Expand Up @@ -438,6 +456,13 @@ public boolean isEventTime() {
// it always works in event-time mode if input row has been attached windows
return true;
}

@Override
public String getDescription() {
return String.format(
"WindowedSliceWindow(innerWindow=%s, windowEndIndex=%d)",
innerAssigner, windowEndIndex);
}
}

/**
Expand Down Expand Up @@ -469,6 +494,11 @@ public Optional<Long> nextTriggerWindow(long windowEnd, Supplier<Boolean> isWind
public long getLastWindowEnd(long sliceEnd) {
return innerAssigner.getLastWindowEnd(sliceEnd);
}

@Override
public String getDescription() {
return String.format("SlicedSharedSliceWindow(innerWindow=%s)", innerAssigner);
}
}

/**
Expand All @@ -491,6 +521,11 @@ public long getLastWindowEnd(long sliceEnd) {
// can't be shared with other windows and the last window should be itself.
return sliceEnd;
}

@Override
public String getDescription() {
return String.format("SlicedUnSharedSliceWindow(innerWindow=%s)", innerAssigner);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public static class SessionUnsliceAssigner implements UnsliceAssigner<TimeWindow
private static final long serialVersionUID = 1L;

private final int rowtimeIndex;
private final long sessionGap;
private final boolean isEventTime;
private final ZoneId shiftTimeZone;

Expand All @@ -69,6 +70,7 @@ public static class SessionUnsliceAssigner implements UnsliceAssigner<TimeWindow
public SessionUnsliceAssigner(int rowtimeIndex, ZoneId shiftTimeZone, long sessionGap) {
this.rowtimeIndex = rowtimeIndex;
this.shiftTimeZone = shiftTimeZone;
this.sessionGap = sessionGap;
this.isEventTime = rowtimeIndex >= 0;
this.innerSessionWindowAssigner =
SessionWindowAssigner.withGap(Duration.ofMillis(sessionGap));
Expand Down Expand Up @@ -138,5 +140,10 @@ protected long getUtcTimestamp(RowData element, ClockService clock) {
public boolean isEventTime() {
return isEventTime;
}

@Override
public String getDescription() {
return String.format("SessionWindow(gap=%dms)", sessionGap);
}
}
}

0 comments on commit 65f0228

Please sign in to comment.