Skip to content

Commit

Permalink
[FLINK-27431][rpc] Allow RpcTimeouts to be specified as Duration
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol authored Apr 28, 2022
1 parent fb00e1c commit a81609e
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -338,10 +339,14 @@ private static Time extractRpcTimeout(
if (isRpcTimeout(parameterAnnotations[i])) {
if (args[i] instanceof Time) {
return (Time) args[i];
} else if (args[i] instanceof Duration) {
return Time.fromDuration((Duration) args[i]);
} else {
throw new RuntimeException(
"The rpc timeout parameter must be of type "
+ Time.class.getName()
+ " or "
+ Duration.class.getName()
+ ". The type "
+ args[i].getClass().getName()
+ " is not supported.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand Down Expand Up @@ -76,10 +78,20 @@ void stopTestEndpoints() {
}

@Test
void testTimeoutException() throws Exception {
void testTimeoutExceptionWithTime() throws Exception {
testTimeoutException(gateway -> gateway.callThatTimesOut(Time.milliseconds(1)));
}

@Test
void testTimeoutExceptionWithDuration() throws Exception {
testTimeoutException(gateway -> gateway.callThatTimesOut(Duration.ofMillis(1)));
}

private void testTimeoutException(
Function<TestingGateway, CompletableFuture<Void>> timeoutOperation) throws Exception {
final TestingGateway gateway = createTestingGateway();

final CompletableFuture<Void> future = gateway.callThatTimesOut(Time.milliseconds(1));
final CompletableFuture<Void> future = timeoutOperation.apply(gateway);

assertThatThrownBy(future::get)
.hasCauseInstanceOf(TimeoutException.class)
Expand Down Expand Up @@ -108,6 +120,8 @@ private TestingGateway createTestingGateway() throws Exception {
private interface TestingGateway extends RpcGateway {

CompletableFuture<Void> callThatTimesOut(@RpcTimeout Time timeout);

CompletableFuture<Void> callThatTimesOut(@RpcTimeout Duration timeout);
}

private static final class TestingRpcEndpoint extends RpcEndpoint implements TestingGateway {
Expand All @@ -121,5 +135,11 @@ public CompletableFuture<Void> callThatTimesOut(@RpcTimeout Time timeout) {
// return a future that never completes, so the call is guaranteed to time out
return new CompletableFuture<>();
}

@Override
public CompletableFuture<Void> callThatTimesOut(@RpcTimeout Duration timeout) {
// return a future that never completes, so the call is guaranteed to time out
return new CompletableFuture<>();
}
}
}

0 comments on commit a81609e

Please sign in to comment.