Skip to content

Commit

Permalink
[FLINK-26252][tests] Refactor MiniClusterExtension to support JUnit 5…
Browse files Browse the repository at this point in the history
… parallel tests
  • Loading branch information
slinkydeveloper authored and zentol committed Feb 24, 2022
1 parent 576354c commit 8cdd0b8
Show file tree
Hide file tree
Showing 28 changed files with 943 additions and 328 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.lang.annotation.Annotation;
import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;

import static com.tngtech.archunit.lang.conditions.ArchPredicates.is;
import static org.apache.flink.architecture.common.JavaFieldPredicates.annotatedWith;
Expand Down Expand Up @@ -76,11 +77,26 @@ public Set<JavaField> apply(JavaClass input) {
* <p>Attention: changing the description will add a rule into the stored.rules.
*/
public static DescribedPredicate<JavaField> arePublicStaticOfType(Class<?> clazz) {
return areFieldOfType(clazz, JavaModifier.PUBLIC, JavaModifier.STATIC);
}

/**
* Tests that the given field is of the given type {@code clazz} and has given modifiers.
*
* <p>Attention: changing the description will add a rule into the stored.rules.
*/
public static DescribedPredicate<JavaField> areFieldOfType(
Class<?> clazz, JavaModifier... modifiers) {
return DescribedPredicate.describe(
"are public, static, and of type " + clazz.getSimpleName(),
String.format(
"are %s, and of type %s",
Arrays.stream(modifiers)
.map(JavaModifier::toString)
.map(String::toLowerCase)
.collect(Collectors.joining(", ")),
clazz.getSimpleName()),
field ->
field.getModifiers().contains(JavaModifier.PUBLIC)
&& field.getModifiers().contains(JavaModifier.STATIC)
field.getModifiers().containsAll(Arrays.asList(modifiers))
&& field.getRawType().isEquivalentTo(clazz));
}

Expand Down Expand Up @@ -126,6 +142,34 @@ public static DescribedPredicate<JavaField> arePublicStaticFinalOfTypeWithAnnota
return arePublicStaticFinalOfType(clazz).and(annotatedWith(annotationType));
}

/**
* Tests that the given field is {@code static final} and of the given type {@code clazz} with
* exactly the given {@code annotationType}. It doesn't matter if public, private or protected.
*/
public static DescribedPredicate<JavaField> areStaticFinalOfTypeWithAnnotation(
Class<?> clazz, Class<? extends Annotation> annotationType) {
return areFieldOfType(clazz, JavaModifier.STATIC, JavaModifier.FINAL)
.and(annotatedWith(annotationType));
}

/**
* Returns a {@link DescribedPredicate} that returns true if one and only one of the given
* predicates match.
*/
@SafeVarargs
public static <T> DescribedPredicate<T> exactlyOneOf(
final DescribedPredicate<? super T>... other) {
return DescribedPredicate.describe(
"only one of the following predicates match:\n"
+ Arrays.stream(other)
.map(dp -> "* " + dp + "\n")
.collect(Collectors.joining()),
t ->
Arrays.stream(other)
.map(dp -> dp.apply(t))
.reduce(false, Boolean::logicalXor));
}

private Predicates() {}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@

package org.apache.flink.architecture.rules;

import org.apache.flink.core.testutils.AllCallbackWrapper;
import org.apache.flink.runtime.testutils.MiniClusterExtension;
import org.apache.flink.architecture.common.Predicates;
import org.apache.flink.runtime.testutils.InternalMiniClusterExtension;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.test.util.MiniClusterWithClientResource;

Expand All @@ -29,15 +30,19 @@
import com.tngtech.archunit.lang.ArchRule;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.Extension;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.util.Arrays;

import static com.tngtech.archunit.core.domain.JavaModifier.ABSTRACT;
import static com.tngtech.archunit.library.freeze.FreezingArchRule.freeze;
import static org.apache.flink.architecture.common.Conditions.fulfill;
import static org.apache.flink.architecture.common.GivenJavaClasses.javaClassesThat;
import static org.apache.flink.architecture.common.Predicates.arePublicFinalOfTypeWithAnnotation;
import static org.apache.flink.architecture.common.Predicates.arePublicStaticFinalAssignableTo;
import static org.apache.flink.architecture.common.Predicates.arePublicStaticFinalOfTypeWithAnnotation;
import static org.apache.flink.architecture.common.Predicates.areStaticFinalOfTypeWithAnnotation;
import static org.apache.flink.architecture.common.Predicates.containAnyFieldsInClassHierarchyThat;

/** Rules for Integration Tests. */
Expand All @@ -61,18 +66,21 @@ public class ITCaseRules {
* <p>1. For JUnit 5 test, both fields are required like:
*
* <pre>{@code
* @RegisterExtension
* public static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
* new MiniClusterExtension(
* new MiniClusterResourceConfiguration.Builder()
* .setConfiguration(getFlinkConfiguration())
* .build());
* }</pre>
*
* @RegisterExtension
* public static AllCallbackWrapper allCallbackWrapper =
* new AllCallbackWrapper(MINI_CLUSTER_RESOURCE);
* <p>2. For JUnit 5 test, use {@link ExtendWith}:
*
* <pre>{@code
* @ExtendWith(MiniClusterExtension.class)
* }</pre>
*
* <p>2. For JUnit 4 test via @Rule like:
* <p>3. For JUnit 4 test via @Rule like:
*
* <pre>{@code
* @Rule
Expand All @@ -86,7 +94,7 @@ public class ITCaseRules {
* .build());
* }</pre>
*
* <p>3. For JUnit 4 test via @ClassRule like:
* <p>4. For JUnit 4 test via @ClassRule like:
*
* <pre>{@code
* @ClassRule
Expand All @@ -110,7 +118,6 @@ public class ITCaseRules {
fulfill(
// JUnit 5 violation check
miniClusterExtensionRule()
.and(allCallbackWrapper())
// JUnit 4 violation check, which should
// be
// removed
Expand All @@ -135,14 +142,46 @@ private static DescribedPredicate<JavaClass> miniClusterWithClientResourceRule()
MiniClusterWithClientResource.class, Rule.class));
}

private static DescribedPredicate<JavaClass> inFlinkRuntimePackages() {
return JavaClass.Predicates.resideInAPackage("org.apache.flink.runtime.*");
}

private static DescribedPredicate<JavaClass> outsideFlinkRuntimePackages() {
return JavaClass.Predicates.resideOutsideOfPackage("org.apache.flink.runtime.*");
}

private static DescribedPredicate<JavaClass> miniClusterExtensionRule() {
return containAnyFieldsInClassHierarchyThat(
arePublicStaticFinalAssignableTo(MiniClusterExtension.class));
// Only flink-runtime should use InternalMiniClusterExtension,
// other packages should use MiniClusterExtension
return Predicates.exactlyOneOf(
inFlinkRuntimePackages()
.and(
containAnyFieldsInClassHierarchyThat(
areStaticFinalOfTypeWithAnnotation(
InternalMiniClusterExtension.class,
RegisterExtension.class))),
outsideFlinkRuntimePackages()
.and(
containAnyFieldsInClassHierarchyThat(
areStaticFinalOfTypeWithAnnotation(
MiniClusterExtension.class,
RegisterExtension.class))),
inFlinkRuntimePackages()
.and(
isAnnotatedWithExtendWithUsingExtension(
InternalMiniClusterExtension.class)),
outsideFlinkRuntimePackages()
.and(isAnnotatedWithExtendWithUsingExtension(MiniClusterExtension.class)));
}

private static DescribedPredicate<JavaClass> allCallbackWrapper() {
return containAnyFieldsInClassHierarchyThat(
arePublicStaticFinalOfTypeWithAnnotation(
AllCallbackWrapper.class, RegisterExtension.class));
private static DescribedPredicate<JavaClass> isAnnotatedWithExtendWithUsingExtension(
Class<? extends Extension> extensionClass) {
return DescribedPredicate.describe(
"is annotated with @ExtendWith with class " + extensionClass.getSimpleName(),
clazz ->
clazz.isAnnotatedWith(ExtendWith.class)
&& Arrays.asList(
clazz.getAnnotationOfType(ExtendWith.class).value())
.contains(extensionClass));
}
}
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
Original file line number Diff line number Diff line change
@@ -1,2 +1,12 @@
org.apache.flink.connector.base.sink.AsyncSinkBaseITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
org.apache.flink.connector.base.source.reader.CoordinatedSourceRescaleITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
org.apache.flink.connector.base.sink.AsyncSinkBaseITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
org.apache.flink.connector.base.source.reader.CoordinatedSourceRescaleITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.AllCallbackWrapper;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.runtime.jobgraph.JobGraph;
Expand All @@ -39,7 +38,8 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.test.util.MiniClusterWithClientExtension;
import org.apache.flink.test.junit5.InjectMiniCluster;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.testutils.logging.LoggerAuditingExtension;

import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -71,21 +71,17 @@ public class AlignedWatermarksITCase {

private static final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics();

public static final MiniClusterWithClientExtension MINI_CLUSTER_RESOURCE =
new MiniClusterWithClientExtension(
@RegisterExtension
private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setConfiguration(reporter.addToConfiguration(new Configuration()))
.build());

@RegisterExtension
public static final AllCallbackWrapper<MiniClusterWithClientExtension> ALL_WRAPPER =
new AllCallbackWrapper<>(MINI_CLUSTER_RESOURCE);

@Test
public void testAlignment() throws Exception {
public void testAlignment(@InjectMiniCluster MiniCluster miniCluster) throws Exception {
final JobGraph jobGraph = getJobGraph();
final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
final CompletableFuture<JobSubmissionResult> submission = miniCluster.submitJob(jobGraph);
final JobID jobID = submission.get().getJobID();
CommonTestUtils.waitForAllTaskRunning(miniCluster, jobID, false);
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
Loading

0 comments on commit 8cdd0b8

Please sign in to comment.