Skip to content

Commit

Permalink
[FLINK-31995][tests] Adds shutdown check to DirectExecutorService
Browse files Browse the repository at this point in the history
This change is added to make DirectExecutorService match the contract
of ExecutorService more closely in terms of RejectedExecutionExceptions.

Signed-off-by: Matthias Pohl <[email protected]>
  • Loading branch information
XComp committed May 4, 2023
1 parent 7cdbfce commit 8d430f5
Showing 1 changed file with 22 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

Expand Down Expand Up @@ -72,6 +73,8 @@ public boolean awaitTermination(long timeout, @Nonnull TimeUnit unit) {
@Override
@Nonnull
public <T> Future<T> submit(@Nonnull Callable<T> task) {
throwRejectedExecutionExceptionIfShutdown();

try {
T result = task.call();

Expand All @@ -84,6 +87,8 @@ public <T> Future<T> submit(@Nonnull Callable<T> task) {
@Override
@Nonnull
public <T> Future<T> submit(@Nonnull Runnable task, T result) {
throwRejectedExecutionExceptionIfShutdown();

task.run();

return new CompletedFuture<>(result, null);
Expand All @@ -92,13 +97,17 @@ public <T> Future<T> submit(@Nonnull Runnable task, T result) {
@Override
@Nonnull
public Future<?> submit(@Nonnull Runnable task) {
throwRejectedExecutionExceptionIfShutdown();

task.run();
return new CompletedFuture<>(null, null);
}

@Override
@Nonnull
public <T> List<Future<T>> invokeAll(@Nonnull Collection<? extends Callable<T>> tasks) {
throwRejectedExecutionExceptionIfShutdown();

ArrayList<Future<T>> result = new ArrayList<>();

for (Callable<T> task : tasks) {
Expand All @@ -117,6 +126,7 @@ public <T> List<Future<T>> invokeAll(
@Nonnull Collection<? extends Callable<T>> tasks,
long timeout,
@Nonnull TimeUnit unit) {
throwRejectedExecutionExceptionIfShutdown();

long end = System.currentTimeMillis() + unit.toMillis(timeout);
Iterator<? extends Callable<T>> iterator = tasks.iterator();
Expand Down Expand Up @@ -170,6 +180,8 @@ public T get(long timeout, @Nonnull TimeUnit unit) {
@Nonnull
public <T> T invokeAny(@Nonnull Collection<? extends Callable<T>> tasks)
throws ExecutionException {
throwRejectedExecutionExceptionIfShutdown();

Exception exception = null;

for (Callable<T> task : tasks) {
Expand All @@ -188,6 +200,7 @@ public <T> T invokeAny(@Nonnull Collection<? extends Callable<T>> tasks)
public <T> T invokeAny(
@Nonnull Collection<? extends Callable<T>> tasks, long timeout, @Nonnull TimeUnit unit)
throws ExecutionException, TimeoutException {
throwRejectedExecutionExceptionIfShutdown();

long end = System.currentTimeMillis() + unit.toMillis(timeout);
Exception exception = null;
Expand All @@ -214,9 +227,18 @@ public <T> T invokeAny(

@Override
public void execute(@Nonnull Runnable command) {
throwRejectedExecutionExceptionIfShutdown();

command.run();
}

private void throwRejectedExecutionExceptionIfShutdown() {
if (isShutdown()) {
throw new RejectedExecutionException(
"The ExecutorService is shut down already. No Callables can be executed.");
}
}

static class CompletedFuture<V> implements Future<V> {
private final V value;
private final Exception exception;
Expand Down

0 comments on commit 8d430f5

Please sign in to comment.