Skip to content

Commit

Permalink
[FLINK-35820] Converting Duration to String fails for big values (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
dawidwys authored Jul 12, 2024
1 parent 2632c8f commit 98e07d1
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 21 deletions.
81 changes: 61 additions & 20 deletions flink-core/src/main/java/org/apache/flink/util/TimeUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.time.Time;

import java.math.BigInteger;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
Expand All @@ -39,6 +40,8 @@ public class TimeUtils {
private static final Map<String, ChronoUnit> LABEL_TO_UNIT_MAP =
Collections.unmodifiableMap(initMap());

private static final BigInteger NANOS_PER_SECOND = BigInteger.valueOf(1_000_000_000L);

/**
* Parse the given string to a java {@link Duration}. The string is in format "{length
* value}{time unit label}", e.g. "123ms", "321 s". If no time unit label is specified, it will
Expand Down Expand Up @@ -79,30 +82,45 @@ public static Duration parseDuration(String text) {
throw new NumberFormatException("text does not start with a number");
}

final long value;
final BigInteger value;
try {
value = Long.parseLong(number); // this throws a NumberFormatException on overflow
value = new BigInteger(number); // this throws a NumberFormatException
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
"The value '"
+ number
+ "' cannot be re represented as 64bit number (numeric overflow).");
"The value '" + number + "' cannot be represented as an integer number.", e);
}

final ChronoUnit unit;
if (unitLabel.isEmpty()) {
return Duration.of(value, ChronoUnit.MILLIS);
}

ChronoUnit unit = LABEL_TO_UNIT_MAP.get(unitLabel);
if (unit != null) {
return Duration.of(value, unit);
unit = ChronoUnit.MILLIS;
} else {
unit = LABEL_TO_UNIT_MAP.get(unitLabel);
}
if (unit == null) {
throw new IllegalArgumentException(
"Time interval unit label '"
+ unitLabel
+ "' does not match any of the recognized units: "
+ TimeUnit.getAllUnits());
}

try {
return convertBigIntToDuration(value, unit);
} catch (ArithmeticException e) {
throw new IllegalArgumentException(
"The value '"
+ number
+ "' cannot be represented as java.time.Duration (numeric overflow).",
e);
}
}

private static Duration convertBigIntToDuration(BigInteger value, ChronoUnit unit) {
final BigInteger nanos = value.multiply(BigInteger.valueOf(unit.getDuration().toNanos()));

final BigInteger[] dividedAndRemainder = nanos.divideAndRemainder(NANOS_PER_SECOND);
return Duration.ofSeconds(dividedAndRemainder[0].longValueExact())
.plusNanos(dividedAndRemainder[1].longValueExact());
}

private static Map<String, ChronoUnit> initMap() {
Expand Down Expand Up @@ -136,17 +154,35 @@ public static String getStringInMillis(final Duration duration) {
* <b>NOTE:</b> It supports only durations that fit into long.
*/
public static String formatWithHighestUnit(Duration duration) {
long nanos = duration.toNanos();
BigInteger nanos = toNanos(duration);

TimeUnit highestIntegerUnit = getHighestIntegerUnit(nanos);
return String.format(
"%d %s",
nanos / highestIntegerUnit.unit.getDuration().toNanos(),
"%s %s",
nanos.divide(highestIntegerUnit.getUnitAsNanos()),
highestIntegerUnit.getLabels().get(0));
}

private static TimeUnit getHighestIntegerUnit(long nanos) {
if (nanos == 0) {
/**
* Converted from {@link Duration#toNanos()}, but produces {@link BigInteger} and does not throw
* an exception on overflow.
*/
private static BigInteger toNanos(Duration duration) {
long tempSeconds = duration.getSeconds();
long tempNanos = duration.getNano();
if (tempSeconds < 0) {
// change the seconds and nano value to
// handle Long.MIN_VALUE case
tempSeconds = tempSeconds + 1;
tempNanos = tempNanos - NANOS_PER_SECOND.longValue();
}
return BigInteger.valueOf(tempSeconds)
.multiply(NANOS_PER_SECOND)
.add(BigInteger.valueOf(tempNanos));
}

private static TimeUnit getHighestIntegerUnit(BigInteger nanos) {
if (nanos.compareTo(BigInteger.ZERO) == 0) {
return TimeUnit.MILLISECONDS;
}

Expand All @@ -162,7 +198,7 @@ private static TimeUnit getHighestIntegerUnit(long nanos) {

TimeUnit highestIntegerUnit = null;
for (TimeUnit timeUnit : orderedUnits) {
if (nanos % timeUnit.unit.getDuration().toNanos() != 0) {
if (nanos.remainder(timeUnit.getUnitAsNanos()).compareTo(BigInteger.ZERO) != 0) {
break;
}
highestIntegerUnit = timeUnit;
Expand All @@ -187,12 +223,13 @@ private enum TimeUnit {

private final ChronoUnit unit;

private final BigInteger unitAsNanos;

TimeUnit(ChronoUnit unit, String[]... labels) {
this.unit = unit;
this.unitAsNanos = BigInteger.valueOf(unit.getDuration().toNanos());
this.labels =
Arrays.stream(labels)
.flatMap(ls -> Arrays.stream(ls))
.collect(Collectors.toList());
Arrays.stream(labels).flatMap(Arrays::stream).collect(Collectors.toList());
}

/**
Expand All @@ -219,6 +256,10 @@ public ChronoUnit getUnit() {
return unit;
}

public BigInteger getUnitAsNanos() {
return unitAsNanos;
}

public static String getAllUnits() {
return Arrays.stream(TimeUnit.values())
.map(TimeUnit::createTimeUnitString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ private static Stream<Arguments> testDurationAndExpectedString() {
Arguments.of(Duration.ofHours(23), "23 h"),
Arguments.of(Duration.ofMillis(-1), "-1 ms"),
Arguments.of(Duration.ofMillis(TimeUnit.DAYS.toMillis(1)), "1 d"),
Arguments.of(Duration.ofHours(24), "1 d"));
Arguments.of(Duration.ofHours(24), "1 d"),
Arguments.of(Duration.ofMillis(9223372036854775807L), "9223372036854775807 ms"),
Arguments.of(
Duration.ofMillis(9223372036854775807L).plusNanos(1),
"9223372036854775807000001 ns"));
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ void testParseDurationNanos() {
assertThat(TimeUtils.parseDuration("424562nanosecond").getNano()).isEqualTo(424562);
assertThat(TimeUtils.parseDuration("424562nanoseconds").getNano()).isEqualTo(424562);
assertThat(TimeUtils.parseDuration("424562 ns").getNano()).isEqualTo(424562);
assertThat(TimeUtils.parseDuration("9223372036854775807000001 ns"))
.isEqualByComparingTo(Duration.ofMillis(9223372036854775807L).plusNanos(1));
}

@Test
Expand Down

0 comments on commit 98e07d1

Please sign in to comment.