Skip to content

Commit

Permalink
[FLINK-33752][Configuration] Change the displayed timeunit to day whe…
Browse files Browse the repository at this point in the history
…n the duration is an integral multiple of 1 day
  • Loading branch information
1996fanrui committed Dec 6, 2023
1 parent 9b61b13 commit 360abe6
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 31 deletions.
45 changes: 21 additions & 24 deletions flink-core/src/main/java/org/apache/flink/util/TimeUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -139,7 +138,19 @@ public static String getStringInMillis(final Duration duration) {
public static String formatWithHighestUnit(Duration duration) {
long nanos = duration.toNanos();

List<TimeUnit> orderedUnits =
TimeUnit highestIntegerUnit = getHighestIntegerUnit(nanos);
return String.format(
"%d %s",
nanos / highestIntegerUnit.unit.getDuration().toNanos(),
highestIntegerUnit.getLabels().get(0));
}

private static TimeUnit getHighestIntegerUnit(long nanos) {
if (nanos == 0) {
return TimeUnit.MILLISECONDS;
}

final List<TimeUnit> orderedUnits =
Arrays.asList(
TimeUnit.NANOSECONDS,
TimeUnit.MICROSECONDS,
Expand All @@ -149,29 +160,15 @@ public static String formatWithHighestUnit(Duration duration) {
TimeUnit.HOURS,
TimeUnit.DAYS);

TimeUnit highestIntegerUnit =
IntStream.range(0, orderedUnits.size())
.sequential()
.filter(
idx ->
nanos % orderedUnits.get(idx).unit.getDuration().toNanos()
!= 0)
.boxed()
.findFirst()
.map(
idx -> {
if (idx == 0) {
return orderedUnits.get(0);
} else {
return orderedUnits.get(idx - 1);
}
})
.orElse(TimeUnit.MILLISECONDS);
TimeUnit highestIntegerUnit = null;
for (TimeUnit timeUnit : orderedUnits) {
if (nanos % timeUnit.unit.getDuration().toNanos() != 0) {
break;
}
highestIntegerUnit = timeUnit;
}

return String.format(
"%d %s",
nanos / highestIntegerUnit.unit.getDuration().toNanos(),
highestIntegerUnit.getLabels().get(0));
return checkNotNull(highestIntegerUnit, "Should find a highestIntegerUnit.");
}

/** Enum which defines time unit, mostly used to parse value from configuration file. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.junit.jupiter.params.provider.MethodSource;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -38,7 +39,11 @@ private static Stream<Arguments> testDurationAndExpectedString() {
Arguments.of(Duration.ofMillis(200), "200 ms"),
Arguments.of(Duration.ofHours(1).plusSeconds(3), "3603 s"),
Arguments.of(Duration.ofSeconds(0), "0 ms"),
Arguments.of(Duration.ofMillis(60000), "1 min"));
Arguments.of(Duration.ofMillis(60000), "1 min"),
Arguments.of(Duration.ofHours(23), "23 h"),
Arguments.of(Duration.ofMillis(-1), "-1 ms"),
Arguments.of(Duration.ofMillis(TimeUnit.DAYS.toMillis(1)), "1 d"),
Arguments.of(Duration.ofHours(24), "1 d"));
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2386,10 +2386,10 @@ LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()])
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a, window_start, window_end, EXPR$3])
+- WindowAggregate(groupBy=[a], window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[86400000 ms], offset=[8 h])], select=[a, COUNT(*) AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end])
+- WindowAggregate(groupBy=[a], window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 d], offset=[8 h])], select=[a, COUNT(*) AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, window_start, window_end])
+- WindowAggregate(groupBy=[a, b], window=[TUMBLE(time_col=[rowtime], size=[86400000 ms], offset=[8 h])], select=[a, b, start('w$) AS window_start, end('w$) AS window_end])
+- WindowAggregate(groupBy=[a, b], window=[TUMBLE(time_col=[rowtime], size=[1 d], offset=[8 h])], select=[a, b, start('w$) AS window_start, end('w$) AS window_end])
+- Exchange(distribution=[hash[a, b]])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, c, rowtime], metadata=[]]], fields=[a, b, c, rowtime])
Expand Down Expand Up @@ -2434,13 +2434,13 @@ LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()])
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a, window_start, window_end, EXPR$3])
+- GlobalWindowAggregate(groupBy=[a], window=[TUMBLE(win_end=[$window_end], size=[86400000 ms], offset=[8 h])], select=[a, COUNT(count1$0) AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end])
+- GlobalWindowAggregate(groupBy=[a], window=[TUMBLE(win_end=[$window_end], size=[1 d], offset=[8 h])], select=[a, COUNT(count1$0) AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end])
+- Exchange(distribution=[hash[a]])
+- LocalWindowAggregate(groupBy=[a], window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[86400000 ms], offset=[8 h])], select=[a, COUNT(*) AS count1$0, slice_end('w$) AS $window_end])
+- LocalWindowAggregate(groupBy=[a], window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 d], offset=[8 h])], select=[a, COUNT(*) AS count1$0, slice_end('w$) AS $window_end])
+- Calc(select=[a, window_start, window_end])
+- GlobalWindowAggregate(groupBy=[a, b], window=[TUMBLE(slice_end=[$slice_end], size=[86400000 ms], offset=[8 h])], select=[a, b, start('w$) AS window_start, end('w$) AS window_end])
+- GlobalWindowAggregate(groupBy=[a, b], window=[TUMBLE(slice_end=[$slice_end], size=[1 d], offset=[8 h])], select=[a, b, start('w$) AS window_start, end('w$) AS window_end])
+- Exchange(distribution=[hash[a, b]])
+- LocalWindowAggregate(groupBy=[a, b], window=[TUMBLE(time_col=[rowtime], size=[86400000 ms], offset=[8 h])], select=[a, b, slice_end('w$) AS $slice_end])
+- LocalWindowAggregate(groupBy=[a, b], window=[TUMBLE(time_col=[rowtime], size=[1 d], offset=[8 h])], select=[a, b, slice_end('w$) AS $slice_end])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, c, rowtime], metadata=[]]], fields=[a, b, c, rowtime])
]]>
Expand Down

0 comments on commit 360abe6

Please sign in to comment.