Skip to content

Commit

Permalink
Allow MoreFutures.allAsList/allAsListWithExceptions to have the passe…
Browse files Browse the repository at this point in the history
…d in list to be mutated (apache#23811)

This resolves ConcurrentModificationExceptions seen within WriteFiles and other places that use MoreFutures.allAsList* methods.

Fixes apache#23809
  • Loading branch information
lukecwik authored Oct 24, 2022
1 parent 226bc97 commit 75fa82c
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.auto.value.AutoValue;
import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -161,17 +162,13 @@ public static CompletionStage<Void> runAsync(ThrowingRunnable runnable) {
/** Like {@link CompletableFuture#allOf} but returning the result of constituent futures. */
public static <T> CompletionStage<List<T>> allAsList(
Collection<? extends CompletionStage<? extends T>> futures) {

// CompletableFuture.allOf completes exceptionally if any of the futures do.
// We have to gather the results separately.
CompletionStage<Void> blockAndDiscard =
CompletableFuture.allOf(futuresToCompletableFutures(futures));
CompletableFuture<? extends T>[] f = futuresToCompletableFutures(futures);
CompletionStage<Void> blockAndDiscard = CompletableFuture.allOf(f);

return blockAndDiscard.thenApply(
nothing ->
futures.stream()
.map(future -> future.toCompletableFuture().join())
.collect(Collectors.toList()));
nothing -> Arrays.stream(f).map(CompletableFuture::join).collect(Collectors.toList()));
}

/**
Expand Down Expand Up @@ -207,25 +204,25 @@ public static <T> ExceptionOrResult<T> result(T result) {
}
}

/** Like {@link #allAsList} but return a list . */
/**
* Like {@link #allAsList} but return a list of {@link ExceptionOrResult} of constituent futures.
*/
public static <T> CompletionStage<List<ExceptionOrResult<T>>> allAsListWithExceptions(
Collection<? extends CompletionStage<? extends T>> futures) {

// CompletableFuture.allOf completes exceptionally if any of the futures do.
// We have to gather the results separately.
CompletionStage<Void> blockAndDiscard =
CompletableFuture.allOf(futuresToCompletableFutures(futures))
.whenComplete((ignoredValues, arbitraryException) -> {});
CompletableFuture<? extends T>[] f = futuresToCompletableFutures(futures);
CompletionStage<Void> blockAndDiscard = CompletableFuture.allOf(f);

return blockAndDiscard.thenApply(
nothing ->
futures.stream()
Arrays.stream(f)
.map(
future -> {
// The limited scope of the exceptions wrapped allows CancellationException
// to still be thrown.
try {
return ExceptionOrResult.<T>result(future.toCompletableFuture().join());
return ExceptionOrResult.<T>result(future.join());
} catch (CompletionException exc) {
return ExceptionOrResult.<T>exception(exc);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,16 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.isA;
import static org.junit.Assert.assertEquals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.sdk.util.MoreFutures.ExceptionOrResult;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
Expand Down Expand Up @@ -84,4 +90,72 @@ public void runAsyncFailure() throws Exception {
thrown.expectMessage(testMessage);
MoreFutures.get(sideEffectFuture);
}

@Test
public void testAllAsListRespectsOriginalList() throws Exception {
CountDownLatch waitTillThreadRunning = new CountDownLatch(1);
CountDownLatch waitTillClearHasHappened = new CountDownLatch(1);
List<CompletionStage<Void>> stages = new ArrayList<>();
stages.add(MoreFutures.runAsync(waitTillThreadRunning::countDown));
stages.add(MoreFutures.runAsync(waitTillClearHasHappened::await));

CompletionStage<List<Void>> results = MoreFutures.allAsList(stages);
waitTillThreadRunning.await();
stages.clear();
waitTillClearHasHappened.countDown();
assertEquals(MoreFutures.get(results), Arrays.asList(null, null));
}

@Test
public void testAllAsListNoExceptionDueToMutation() throws Exception {
// This loop runs many times trying to exercise a race condition that existed where mutation
// of the passed in completion stages lead to various exceptions (such as a
// ConcurrentModificationException). See https://github.com/apache/beam/issues/23809
for (int i = 0; i < 10000; ++i) {
CountDownLatch waitTillThreadRunning = new CountDownLatch(1);
List<CompletionStage<Void>> stages = new ArrayList<>();
stages.add(MoreFutures.runAsync(waitTillThreadRunning::countDown));

CompletionStage<List<Void>> results = MoreFutures.allAsList(stages);
waitTillThreadRunning.await();
stages.clear();
MoreFutures.get(results);
}
}

@Test
public void testAllAsListWithExceptionsRespectsOriginalList() throws Exception {
CountDownLatch waitTillThreadRunning = new CountDownLatch(1);
CountDownLatch waitTillClearHasHappened = new CountDownLatch(1);
List<CompletionStage<Void>> stages = new ArrayList<>();
stages.add(MoreFutures.runAsync(waitTillThreadRunning::countDown));
stages.add(MoreFutures.runAsync(waitTillClearHasHappened::await));

CompletionStage<List<ExceptionOrResult<Void>>> results =
MoreFutures.allAsListWithExceptions(stages);
waitTillThreadRunning.await();
stages.clear();
waitTillClearHasHappened.countDown();
assertEquals(
MoreFutures.get(results),
Arrays.asList(ExceptionOrResult.result(null), ExceptionOrResult.result(null)));
}

@Test
public void testAllAsListWithExceptionsNoExceptionDueToMutation() throws Exception {
// This loop runs many times trying to exercise a race condition that existed where mutation
// of the passed in completion stages lead to various exceptions (such as a
// ConcurrentModificationException). See https://github.com/apache/beam/issues/23809
for (int i = 0; i < 10000; ++i) {
CountDownLatch waitTillThreadRunning = new CountDownLatch(1);
List<CompletionStage<Void>> stages = new ArrayList<>();
stages.add(MoreFutures.runAsync(waitTillThreadRunning::countDown));

CompletionStage<List<ExceptionOrResult<Void>>> results =
MoreFutures.allAsListWithExceptions(stages);
waitTillThreadRunning.await();
stages.clear();
MoreFutures.get(results);
}
}
}

0 comments on commit 75fa82c

Please sign in to comment.