Skip to content

Commit

Permalink
[FLINK-18638][runtime] Add optional timeout message to FutureUtils.or…
Browse files Browse the repository at this point in the history
…Timeout
  • Loading branch information
Tartarus0zm authored Jul 24, 2020
1 parent fc36235 commit d0ba79e
Showing 1 changed file with 40 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,21 @@ public RetryException(Throwable cause) {
* @return The timeout enriched future
*/
public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> future, long timeout, TimeUnit timeUnit) {
return orTimeout(future, timeout, timeUnit, Executors.directExecutor());
return orTimeout(future, timeout, timeUnit, Executors.directExecutor(), null);
}

/**
* Times the given future out after the timeout.
*
* @param future to time out
* @param timeout after which the given future is timed out
* @param timeUnit time unit of the timeout
* @param timeoutMsg timeout message for exception
* @param <T> type of the given future
* @return The timeout enriched future
*/
public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> future, long timeout, TimeUnit timeUnit, @Nullable String timeoutMsg) {
return orTimeout(future, timeout, timeUnit, Executors.directExecutor(), timeoutMsg);
}

/**
Expand All @@ -439,10 +453,30 @@ public static <T> CompletableFuture<T> orTimeout(
long timeout,
TimeUnit timeUnit,
Executor timeoutFailExecutor) {
return orTimeout(future, timeout, timeUnit, timeoutFailExecutor, null);
}

/**
* Times the given future out after the timeout.
*
* @param future to time out
* @param timeout after which the given future is timed out
* @param timeUnit time unit of the timeout
* @param timeoutFailExecutor executor that will complete the future exceptionally after the timeout is reached
* @param timeoutMsg timeout message for exception
* @param <T> type of the given future
* @return The timeout enriched future
*/
public static <T> CompletableFuture<T> orTimeout(
CompletableFuture<T> future,
long timeout,
TimeUnit timeUnit,
Executor timeoutFailExecutor,
@Nullable String timeoutMsg) {

if (!future.isDone()) {
final ScheduledFuture<?> timeoutFuture = Delayer.delay(
() -> timeoutFailExecutor.execute(new Timeout(future)), timeout, timeUnit);
() -> timeoutFailExecutor.execute(new Timeout(future, timeoutMsg)), timeout, timeUnit);

future.whenComplete((T value, Throwable throwable) -> {
if (!timeoutFuture.isDone()) {
Expand Down Expand Up @@ -1026,14 +1060,16 @@ public static <T> T getWithoutException(CompletableFuture<T> future) {
private static final class Timeout implements Runnable {

private final CompletableFuture<?> future;
private final String timeoutMsg;

private Timeout(CompletableFuture<?> future) {
private Timeout(CompletableFuture<?> future, @Nullable String timeoutMsg) {
this.future = checkNotNull(future);
this.timeoutMsg = timeoutMsg;
}

@Override
public void run() {
future.completeExceptionally(new TimeoutException());
future.completeExceptionally(new TimeoutException(timeoutMsg));
}
}

Expand Down

0 comments on commit d0ba79e

Please sign in to comment.