diff --git a/flink-architecture-tests/flink-architecture-tests-base/src/main/java/org/apache/flink/architecture/common/Predicates.java b/flink-architecture-tests/flink-architecture-tests-base/src/main/java/org/apache/flink/architecture/common/Predicates.java index 56e6c1160f053..d45cb56b5a380 100644 --- a/flink-architecture-tests/flink-architecture-tests-base/src/main/java/org/apache/flink/architecture/common/Predicates.java +++ b/flink-architecture-tests/flink-architecture-tests-base/src/main/java/org/apache/flink/architecture/common/Predicates.java @@ -29,6 +29,7 @@ import java.lang.annotation.Annotation; import java.util.Arrays; import java.util.Set; +import java.util.stream.Collectors; import static com.tngtech.archunit.lang.conditions.ArchPredicates.is; import static org.apache.flink.architecture.common.JavaFieldPredicates.annotatedWith; @@ -76,11 +77,26 @@ public Set apply(JavaClass input) { *

Attention: changing the description will add a rule into the stored.rules. */ public static DescribedPredicate arePublicStaticOfType(Class clazz) { + return areFieldOfType(clazz, JavaModifier.PUBLIC, JavaModifier.STATIC); + } + + /** + * Tests that the given field is of the given type {@code clazz} and has given modifiers. + * + *

Attention: changing the description will add a rule into the stored.rules. + */ + public static DescribedPredicate areFieldOfType( + Class clazz, JavaModifier... modifiers) { return DescribedPredicate.describe( - "are public, static, and of type " + clazz.getSimpleName(), + String.format( + "are %s, and of type %s", + Arrays.stream(modifiers) + .map(JavaModifier::toString) + .map(String::toLowerCase) + .collect(Collectors.joining(", ")), + clazz.getSimpleName()), field -> - field.getModifiers().contains(JavaModifier.PUBLIC) - && field.getModifiers().contains(JavaModifier.STATIC) + field.getModifiers().containsAll(Arrays.asList(modifiers)) && field.getRawType().isEquivalentTo(clazz)); } @@ -126,6 +142,34 @@ public static DescribedPredicate arePublicStaticFinalOfTypeWithAnnota return arePublicStaticFinalOfType(clazz).and(annotatedWith(annotationType)); } + /** + * Tests that the given field is {@code static final} and of the given type {@code clazz} with + * exactly the given {@code annotationType}. It doesn't matter if public, private or protected. + */ + public static DescribedPredicate areStaticFinalOfTypeWithAnnotation( + Class clazz, Class annotationType) { + return areFieldOfType(clazz, JavaModifier.STATIC, JavaModifier.FINAL) + .and(annotatedWith(annotationType)); + } + + /** + * Returns a {@link DescribedPredicate} that returns true if one and only one of the given + * predicates match. + */ + @SafeVarargs + public static DescribedPredicate exactlyOneOf( + final DescribedPredicate... other) { + return DescribedPredicate.describe( + "only one of the following predicates match:\n" + + Arrays.stream(other) + .map(dp -> "* " + dp + "\n") + .collect(Collectors.joining()), + t -> + Arrays.stream(other) + .map(dp -> dp.apply(t)) + .reduce(false, Boolean::logicalXor)); + } + private Predicates() {} /** diff --git a/flink-architecture-tests/flink-architecture-tests-test/src/main/java/org/apache/flink/architecture/rules/ITCaseRules.java b/flink-architecture-tests/flink-architecture-tests-test/src/main/java/org/apache/flink/architecture/rules/ITCaseRules.java index b9afc4d7c3d6d..e679f5cce0a92 100644 --- a/flink-architecture-tests/flink-architecture-tests-test/src/main/java/org/apache/flink/architecture/rules/ITCaseRules.java +++ b/flink-architecture-tests/flink-architecture-tests-test/src/main/java/org/apache/flink/architecture/rules/ITCaseRules.java @@ -18,8 +18,9 @@ package org.apache.flink.architecture.rules; -import org.apache.flink.core.testutils.AllCallbackWrapper; -import org.apache.flink.runtime.testutils.MiniClusterExtension; +import org.apache.flink.architecture.common.Predicates; +import org.apache.flink.runtime.testutils.InternalMiniClusterExtension; +import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.test.util.MiniClusterWithClientResource; @@ -29,15 +30,19 @@ import com.tngtech.archunit.lang.ArchRule; import org.junit.ClassRule; import org.junit.Rule; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.Extension; import org.junit.jupiter.api.extension.RegisterExtension; +import java.util.Arrays; + import static com.tngtech.archunit.core.domain.JavaModifier.ABSTRACT; import static com.tngtech.archunit.library.freeze.FreezingArchRule.freeze; import static org.apache.flink.architecture.common.Conditions.fulfill; import static org.apache.flink.architecture.common.GivenJavaClasses.javaClassesThat; import static org.apache.flink.architecture.common.Predicates.arePublicFinalOfTypeWithAnnotation; -import static org.apache.flink.architecture.common.Predicates.arePublicStaticFinalAssignableTo; import static org.apache.flink.architecture.common.Predicates.arePublicStaticFinalOfTypeWithAnnotation; +import static org.apache.flink.architecture.common.Predicates.areStaticFinalOfTypeWithAnnotation; import static org.apache.flink.architecture.common.Predicates.containAnyFieldsInClassHierarchyThat; /** Rules for Integration Tests. */ @@ -61,18 +66,21 @@ public class ITCaseRules { *

1. For JUnit 5 test, both fields are required like: * *

{@code
+     * @RegisterExtension
      * public static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
      *         new MiniClusterExtension(
      *                 new MiniClusterResourceConfiguration.Builder()
      *                         .setConfiguration(getFlinkConfiguration())
      *                         .build());
+     * }
* - * @RegisterExtension - * public static AllCallbackWrapper allCallbackWrapper = - * new AllCallbackWrapper(MINI_CLUSTER_RESOURCE); + *

2. For JUnit 5 test, use {@link ExtendWith}: + * + *

{@code
+     * @ExtendWith(MiniClusterExtension.class)
      * }
* - *

2. For JUnit 4 test via @Rule like: + *

3. For JUnit 4 test via @Rule like: * *

{@code
      * @Rule
@@ -86,7 +94,7 @@ public class ITCaseRules {
      *                          .build());
      * }
* - *

3. For JUnit 4 test via @ClassRule like: + *

4. For JUnit 4 test via @ClassRule like: * *

{@code
      * @ClassRule
@@ -110,7 +118,6 @@ public class ITCaseRules {
                                             fulfill(
                                                     // JUnit 5 violation check
                                                     miniClusterExtensionRule()
-                                                            .and(allCallbackWrapper())
                                                             // JUnit 4 violation check, which should
                                                             // be
                                                             // removed
@@ -135,14 +142,46 @@ private static DescribedPredicate miniClusterWithClientResourceRule()
                         MiniClusterWithClientResource.class, Rule.class));
     }
 
+    private static DescribedPredicate inFlinkRuntimePackages() {
+        return JavaClass.Predicates.resideInAPackage("org.apache.flink.runtime.*");
+    }
+
+    private static DescribedPredicate outsideFlinkRuntimePackages() {
+        return JavaClass.Predicates.resideOutsideOfPackage("org.apache.flink.runtime.*");
+    }
+
     private static DescribedPredicate miniClusterExtensionRule() {
-        return containAnyFieldsInClassHierarchyThat(
-                arePublicStaticFinalAssignableTo(MiniClusterExtension.class));
+        // Only flink-runtime should use InternalMiniClusterExtension,
+        // other packages should use MiniClusterExtension
+        return Predicates.exactlyOneOf(
+                inFlinkRuntimePackages()
+                        .and(
+                                containAnyFieldsInClassHierarchyThat(
+                                        areStaticFinalOfTypeWithAnnotation(
+                                                InternalMiniClusterExtension.class,
+                                                RegisterExtension.class))),
+                outsideFlinkRuntimePackages()
+                        .and(
+                                containAnyFieldsInClassHierarchyThat(
+                                        areStaticFinalOfTypeWithAnnotation(
+                                                MiniClusterExtension.class,
+                                                RegisterExtension.class))),
+                inFlinkRuntimePackages()
+                        .and(
+                                isAnnotatedWithExtendWithUsingExtension(
+                                        InternalMiniClusterExtension.class)),
+                outsideFlinkRuntimePackages()
+                        .and(isAnnotatedWithExtendWithUsingExtension(MiniClusterExtension.class)));
     }
 
-    private static DescribedPredicate allCallbackWrapper() {
-        return containAnyFieldsInClassHierarchyThat(
-                arePublicStaticFinalOfTypeWithAnnotation(
-                        AllCallbackWrapper.class, RegisterExtension.class));
+    private static DescribedPredicate isAnnotatedWithExtendWithUsingExtension(
+            Class extensionClass) {
+        return DescribedPredicate.describe(
+                "is annotated with @ExtendWith with class " + extensionClass.getSimpleName(),
+                clazz ->
+                        clazz.isAnnotatedWith(ExtendWith.class)
+                                && Arrays.asList(
+                                                clazz.getAnnotationOfType(ExtendWith.class).value())
+                                        .contains(extensionClass));
     }
 }
diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/archunit-violations/84abeb9c-8355-4165-96aa-dda65b04e5e7 b/flink-connectors/flink-connector-aws-kinesis-data-streams/archunit-violations/84abeb9c-8355-4165-96aa-dda65b04e5e7
index f79360e6b5984..a370e4c6cfc4f 100644
--- a/flink-connectors/flink-connector-aws-kinesis-data-streams/archunit-violations/84abeb9c-8355-4165-96aa-dda65b04e5e7
+++ b/flink-connectors/flink-connector-aws-kinesis-data-streams/archunit-violations/84abeb9c-8355-4165-96aa-dda65b04e5e7
@@ -1 +1,6 @@
-org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
+org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/archunit-violations/a6cbd99c-b115-447a-8f19-43c1094db549 b/flink-connectors/flink-connector-aws-kinesis-firehose/archunit-violations/a6cbd99c-b115-447a-8f19-43c1094db549
index 0cdac11509207..5ad7b147a8c3b 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/archunit-violations/a6cbd99c-b115-447a-8f19-43c1094db549
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/archunit-violations/a6cbd99c-b115-447a-8f19-43c1094db549
@@ -1 +1,6 @@
-org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
+org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
diff --git a/flink-connectors/flink-connector-base/archunit-violations/8ab2328f-b38b-4e34-b768-0deb6b6171fb b/flink-connectors/flink-connector-base/archunit-violations/8ab2328f-b38b-4e34-b768-0deb6b6171fb
index d81373f3f0c71..03ba501652365 100644
--- a/flink-connectors/flink-connector-base/archunit-violations/8ab2328f-b38b-4e34-b768-0deb6b6171fb
+++ b/flink-connectors/flink-connector-base/archunit-violations/8ab2328f-b38b-4e34-b768-0deb6b6171fb
@@ -1,2 +1,12 @@
-org.apache.flink.connector.base.sink.AsyncSinkBaseITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connector.base.source.reader.CoordinatedSourceRescaleITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
+org.apache.flink.connector.base.sink.AsyncSinkBaseITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connector.base.source.reader.CoordinatedSourceRescaleITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/AlignedWatermarksITCase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/AlignedWatermarksITCase.java
index 84107518580e3..bf36e2c3c6c40 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/AlignedWatermarksITCase.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/AlignedWatermarksITCase.java
@@ -27,7 +27,6 @@
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.testutils.AllCallbackWrapper;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -39,7 +38,8 @@
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.test.util.MiniClusterWithClientExtension;
+import org.apache.flink.test.junit5.InjectMiniCluster;
+import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.testutils.logging.LoggerAuditingExtension;
 
 import org.junit.jupiter.api.Test;
@@ -71,21 +71,17 @@ public class AlignedWatermarksITCase {
 
     private static final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics();
 
-    public static final MiniClusterWithClientExtension MINI_CLUSTER_RESOURCE =
-            new MiniClusterWithClientExtension(
+    @RegisterExtension
+    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+            new MiniClusterExtension(
                     new MiniClusterResourceConfiguration.Builder()
                             .setNumberTaskManagers(1)
                             .setConfiguration(reporter.addToConfiguration(new Configuration()))
                             .build());
 
-    @RegisterExtension
-    public static final AllCallbackWrapper ALL_WRAPPER =
-            new AllCallbackWrapper<>(MINI_CLUSTER_RESOURCE);
-
     @Test
-    public void testAlignment() throws Exception {
+    public void testAlignment(@InjectMiniCluster MiniCluster miniCluster) throws Exception {
         final JobGraph jobGraph = getJobGraph();
-        final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
         final CompletableFuture submission = miniCluster.submitJob(jobGraph);
         final JobID jobID = submission.get().getJobID();
         CommonTestUtils.waitForAllTaskRunning(miniCluster, jobID, false);
diff --git a/flink-connectors/flink-connector-cassandra/archunit-violations/dc1ba6f4-3d84-498c-a085-e02ba5936201 b/flink-connectors/flink-connector-cassandra/archunit-violations/dc1ba6f4-3d84-498c-a085-e02ba5936201
index 895e122724e68..15fa3a72cecad 100644
--- a/flink-connectors/flink-connector-cassandra/archunit-violations/dc1ba6f4-3d84-498c-a085-e02ba5936201
+++ b/flink-connectors/flink-connector-cassandra/archunit-violations/dc1ba6f4-3d84-498c-a085-e02ba5936201
@@ -1 +1,6 @@
-org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
+org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
diff --git a/flink-connectors/flink-connector-elasticsearch-base/archunit-violations/dd583797-83e1-414c-a38d-330773978813 b/flink-connectors/flink-connector-elasticsearch-base/archunit-violations/dd583797-83e1-414c-a38d-330773978813
index 249f36169da7a..a215b58b5c413 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/archunit-violations/dd583797-83e1-414c-a38d-330773978813
+++ b/flink-connectors/flink-connector-elasticsearch-base/archunit-violations/dd583797-83e1-414c-a38d-330773978813
@@ -1 +1,6 @@
-org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriterITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
+org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriterITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
diff --git a/flink-connectors/flink-connector-elasticsearch6/archunit-violations/db3972e4-f3a3-45b2-9643-27cba0cef09d b/flink-connectors/flink-connector-elasticsearch6/archunit-violations/db3972e4-f3a3-45b2-9643-27cba0cef09d
index 2520fe866faf6..341409ace58b8 100644
--- a/flink-connectors/flink-connector-elasticsearch6/archunit-violations/db3972e4-f3a3-45b2-9643-27cba0cef09d
+++ b/flink-connectors/flink-connector-elasticsearch6/archunit-violations/db3972e4-f3a3-45b2-9643-27cba0cef09d
@@ -1,2 +1,12 @@
-org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connector.elasticsearch.table.Elasticsearch6DynamicSinkITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
+org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connector.elasticsearch.table.Elasticsearch6DynamicSinkITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
diff --git a/flink-connectors/flink-connector-elasticsearch7/archunit-violations/e1f30f33-c61c-4707-8c78-a3a80479564e b/flink-connectors/flink-connector-elasticsearch7/archunit-violations/e1f30f33-c61c-4707-8c78-a3a80479564e
index 854c270e633ad..e6b90b932bee7 100644
--- a/flink-connectors/flink-connector-elasticsearch7/archunit-violations/e1f30f33-c61c-4707-8c78-a3a80479564e
+++ b/flink-connectors/flink-connector-elasticsearch7/archunit-violations/e1f30f33-c61c-4707-8c78-a3a80479564e
@@ -1,2 +1,12 @@
-org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connector.elasticsearch.table.Elasticsearch7DynamicSinkITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
+org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connector.elasticsearch.table.Elasticsearch7DynamicSinkITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
diff --git a/flink-connectors/flink-connector-files/archunit-violations/f5e3e868-8d92-4258-9654-a605dc9c550f b/flink-connectors/flink-connector-files/archunit-violations/f5e3e868-8d92-4258-9654-a605dc9c550f
index 1a9418020375f..ff88812bba2ef 100644
--- a/flink-connectors/flink-connector-files/archunit-violations/f5e3e868-8d92-4258-9654-a605dc9c550f
+++ b/flink-connectors/flink-connector-files/archunit-violations/f5e3e868-8d92-4258-9654-a605dc9c550f
@@ -1,3 +1,18 @@
-org.apache.flink.connector.file.sink.BatchExecutionFileSinkITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connector.file.sink.StreamingExecutionFileSinkITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connector.file.sink.writer.FileSinkMigrationITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
+org.apache.flink.connector.file.sink.BatchExecutionFileSinkITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connector.file.sink.StreamingExecutionFileSinkITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connector.file.sink.writer.FileSinkMigrationITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
diff --git a/flink-connectors/flink-connector-hive/archunit-violations/26d337fc-45c4-4d03-a84a-6692c37fafbc b/flink-connectors/flink-connector-hive/archunit-violations/26d337fc-45c4-4d03-a84a-6692c37fafbc
index 115f4c9c1db50..0113d84f20c60 100644
--- a/flink-connectors/flink-connector-hive/archunit-violations/26d337fc-45c4-4d03-a84a-6692c37fafbc
+++ b/flink-connectors/flink-connector-hive/archunit-violations/26d337fc-45c4-4d03-a84a-6692c37fafbc
@@ -1,11 +1,66 @@
-org.apache.flink.connectors.hive.HiveDialectITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connectors.hive.HiveDialectQueryITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connectors.hive.HiveLookupJoinITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connectors.hive.HiveRunnerITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connectors.hive.HiveSourceITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connectors.hive.HiveTableSinkITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connectors.hive.HiveTableSourceITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connectors.hive.HiveTemporalJoinITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connectors.hive.TableEnvHiveConnectorITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReaderITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.table.catalog.hive.HiveCatalogITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
+org.apache.flink.connectors.hive.HiveDialectITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connectors.hive.HiveDialectQueryITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connectors.hive.HiveLookupJoinITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connectors.hive.HiveRunnerITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connectors.hive.HiveSourceITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connectors.hive.HiveTableSinkITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connectors.hive.HiveTableSourceITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connectors.hive.HiveTemporalJoinITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connectors.hive.TableEnvHiveConnectorITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReaderITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.table.catalog.hive.HiveCatalogITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
diff --git a/flink-connectors/flink-connector-jdbc/archunit-violations/6b9ab1b0-c14d-4667-bab5-407b81fba98b b/flink-connectors/flink-connector-jdbc/archunit-violations/6b9ab1b0-c14d-4667-bab5-407b81fba98b
index c60803a1221cc..ecb32df4ac599 100644
--- a/flink-connectors/flink-connector-jdbc/archunit-violations/6b9ab1b0-c14d-4667-bab5-407b81fba98b
+++ b/flink-connectors/flink-connector-jdbc/archunit-violations/6b9ab1b0-c14d-4667-bab5-407b81fba98b
@@ -1,3 +1,18 @@
-org.apache.flink.connector.jdbc.JdbcITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connector.jdbc.catalog.MySqlCatalogITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connector.jdbc.catalog.PostgresCatalogITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
+org.apache.flink.connector.jdbc.JdbcITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connector.jdbc.catalog.MySqlCatalogITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connector.jdbc.catalog.PostgresCatalogITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
diff --git a/flink-connectors/flink-connector-kafka/archunit-violations/97dda445-f6bc-43e2-8106-5876ca0cd052 b/flink-connectors/flink-connector-kafka/archunit-violations/97dda445-f6bc-43e2-8106-5876ca0cd052
index 52e14f9113660..c1e6561853301 100644
--- a/flink-connectors/flink-connector-kafka/archunit-violations/97dda445-f6bc-43e2-8106-5876ca0cd052
+++ b/flink-connectors/flink-connector-kafka/archunit-violations/97dda445-f6bc-43e2-8106-5876ca0cd052
@@ -1,13 +1,78 @@
-org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducerITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connector.kafka.sink.KafkaSinkITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connector.kafka.sink.KafkaTransactionLogITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connector.kafka.sink.KafkaWriterITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connector.kafka.source.KafkaSourceITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connector.kafka.source.KafkaSourceLegacyITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.streaming.connectors.kafka.FlinkKafkaInternalProducerITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.streaming.connectors.kafka.KafkaITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.streaming.connectors.kafka.KafkaProducerAtLeastOnceITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.streaming.connectors.kafka.KafkaProducerExactlyOnceITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleExactlyOnceITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
+org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducerITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connector.kafka.sink.KafkaSinkITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connector.kafka.sink.KafkaTransactionLogITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connector.kafka.sink.KafkaWriterITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connector.kafka.source.KafkaSourceITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connector.kafka.source.KafkaSourceLegacyITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.streaming.connectors.kafka.FlinkKafkaInternalProducerITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.streaming.connectors.kafka.KafkaITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.streaming.connectors.kafka.KafkaProducerAtLeastOnceITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.streaming.connectors.kafka.KafkaProducerExactlyOnceITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleExactlyOnceITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
index ef67997c84e4d..94e23d7b0977e 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
@@ -22,11 +22,10 @@
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase;
 import org.apache.flink.connector.pulsar.testutils.function.ControlSource;
-import org.apache.flink.core.testutils.AllCallbackWrapper;
 import org.apache.flink.runtime.minicluster.RpcServiceSharing;
-import org.apache.flink.runtime.testutils.MiniClusterExtension;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.testutils.junit.SharedObjectsExtension;
 
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -46,7 +45,8 @@ class PulsarSinkITCase extends PulsarTestSuiteBase {
 
     private static final int PARALLELISM = 1;
 
-    public static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+    @RegisterExtension
+    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
             new MiniClusterExtension(
                     new MiniClusterResourceConfiguration.Builder()
                             .setNumberTaskManagers(1)
@@ -55,11 +55,6 @@ class PulsarSinkITCase extends PulsarTestSuiteBase {
                             .withHaLeadershipControl()
                             .build());
 
-    @SuppressWarnings("unused")
-    @RegisterExtension
-    public static final AllCallbackWrapper CALLBACK_WRAPPER =
-            new AllCallbackWrapper<>(MINI_CLUSTER_RESOURCE);
-
     // Using this extension for creating shared reference which would be used in source function.
     @RegisterExtension final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
 
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAJobRunOnMinioS3StoreITCase.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAJobRunOnMinioS3StoreITCase.java
index ebe71daa4e2bb..bd34590d0eede 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAJobRunOnMinioS3StoreITCase.java
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAJobRunOnMinioS3StoreITCase.java
@@ -22,16 +22,14 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.testutils.AllCallbackWrapper;
-import org.apache.flink.core.testutils.EachCallbackWrapper;
 import org.apache.flink.core.testutils.TestContainerExtension;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.highavailability.AbstractHAJobRunITCase;
 import org.apache.flink.runtime.highavailability.FileSystemJobResultStore;
 import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
-import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.runtime.testutils.MiniClusterExtension;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.junit5.MiniClusterExtension;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
 
@@ -65,21 +63,15 @@ public abstract class HAJobRunOnMinioS3StoreITCase extends AbstractHAJobRunITCas
 
     @RegisterExtension
     @Order(3)
-    private static final EachCallbackWrapper miniClusterExtension =
-            new EachCallbackWrapper<>(
-                    new MiniClusterExtension(
-                            () -> {
-                                final Configuration configuration = createConfiguration();
-                                FileSystem.initialize(configuration, null);
-                                return new MiniClusterResourceConfiguration.Builder()
-                                        .setConfiguration(configuration)
-                                        .build();
-                            }));
-
-    @Override
-    public MiniCluster getMiniCluster() {
-        return miniClusterExtension.getCustomExtension().getMiniCluster();
-    }
+    private static final MiniClusterExtension miniClusterExtension =
+            new MiniClusterExtension(
+                    () -> {
+                        final Configuration configuration = createConfiguration();
+                        FileSystem.initialize(configuration, null);
+                        return new MiniClusterResourceConfiguration.Builder()
+                                .setConfiguration(configuration)
+                                .build();
+                    });
 
     private static MinioTestContainer getMinioContainer() {
         return MINIO_EXTENSION.getCustomExtension().getTestContainer();
diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java
index a9a64e71d35c4..97fd78459fc11 100644
--- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java
+++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java
@@ -26,16 +26,15 @@
 import org.apache.flink.connector.file.src.FileSource;
 import org.apache.flink.connector.file.src.reader.StreamFormat;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.testutils.AllCallbackWrapper;
 import org.apache.flink.formats.common.Converter;
 import org.apache.flink.runtime.minicluster.RpcServiceSharing;
-import org.apache.flink.runtime.testutils.MiniClusterExtension;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamUtils;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
 import org.apache.flink.streaming.api.operators.collect.ClientAndIterator;
+import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.util.TestLoggerExtension;
 import org.apache.flink.util.function.FunctionWithException;
 
@@ -79,6 +78,7 @@ public class DataStreamCsvITCase {
 
     @TempDir File outDir;
 
+    @RegisterExtension
     private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
             new MiniClusterExtension(
                     new MiniClusterResourceConfiguration.Builder()
@@ -88,10 +88,6 @@ public class DataStreamCsvITCase {
                             .withHaLeadershipControl()
                             .build());
 
-    @RegisterExtension
-    private static AllCallbackWrapper allCallbackWrapper =
-            new AllCallbackWrapper<>(MINI_CLUSTER_RESOURCE);
-
     // ------------------------------------------------------------------------
     //  test data
     // ------------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index 54189bd00904f..9da584dfdca2c 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -25,7 +25,6 @@
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
-import org.apache.flink.core.testutils.AllCallbackWrapper;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.jmx.JMXReporter;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
@@ -37,7 +36,8 @@
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.MiniClusterWithClientExtension;
+import org.apache.flink.test.junit5.InjectClusterClient;
+import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.util.concurrent.FutureUtils;
 
@@ -57,18 +57,15 @@
 /** Tests to verify JMX reporter functionality on the JobManager. */
 class JMXJobManagerMetricTest {
 
-    private static final MiniClusterWithClientExtension MINI_CLUSTER_RESOURCE =
-            new MiniClusterWithClientExtension(
+    @RegisterExtension
+    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+            new MiniClusterExtension(
                     new MiniClusterResourceConfiguration.Builder()
                             .setConfiguration(getConfiguration())
                             .setNumberSlotsPerTaskManager(1)
                             .setNumberTaskManagers(1)
                             .build());
 
-    @RegisterExtension
-    private static final AllCallbackWrapper ALL_WRAPPER =
-            new AllCallbackWrapper<>(MINI_CLUSTER_RESOURCE);
-
     private static Configuration getConfiguration() {
         Configuration flinkConfiguration = new Configuration();
 
@@ -84,7 +81,8 @@ private static Configuration getConfiguration() {
 
     /** Tests that metrics registered on the JobManager are actually accessible via JMX. */
     @Test
-    void testJobManagerJMXMetricAccess() throws Exception {
+    void testJobManagerJMXMetricAccess(@InjectClusterClient ClusterClient client)
+            throws Exception {
         Deadline deadline = Deadline.now().plus(Duration.ofMinutes(2));
 
         try {
@@ -113,7 +111,6 @@ void testJobManagerJMXMetricAccess() throws Exception {
                             .setJobCheckpointingSettings(jobCheckpointingSettings)
                             .build();
 
-            ClusterClient client = MINI_CLUSTER_RESOURCE.getClusterClient();
             client.submitJob(jobGraph).get();
 
             FutureUtils.retrySuccessfulWithDelay(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHAJobRunITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHAJobRunITCase.java
index 9398cdafa984d..80f9407386564 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHAJobRunITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHAJobRunITCase.java
@@ -29,6 +29,7 @@
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
+import org.apache.flink.test.junit5.InjectMiniCluster;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.util.TestLoggerExtension;
 import org.apache.flink.util.concurrent.FutureUtils;
@@ -46,6 +47,9 @@
 /**
  * {@code AbstractHAJobRunITCase} runs a job storing in HA mode and provides {@code abstract}
  * methods for initializing a specific {@link FileSystem}.
+ *
+ * 

Sub-classes must use a {@link + * org.apache.flink.runtime.testutils.InternalMiniClusterExtension}. */ @ExtendWith(TestLoggerExtension.class) public abstract class AbstractHAJobRunITCase { @@ -72,12 +76,9 @@ protected static Configuration addHaConfiguration( protected void runAfterJobTermination() throws Exception {} - protected abstract MiniCluster getMiniCluster(); - @Test - public void testJobExecutionInHaMode() throws Exception { - final MiniCluster flinkCluster = getMiniCluster(); - + public void testJobExecutionInHaMode(@InjectMiniCluster MiniCluster flinkCluster) + throws Exception { final JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph(); // providing a timeout helps making the test fail in case some issue occurred while diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java index 09d60e6207972..bb5c6bf006f66 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java @@ -25,7 +25,6 @@ import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.core.testutils.AllCallbackWrapper; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; @@ -40,8 +39,9 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.minicluster.MiniCluster; -import org.apache.flink.runtime.testutils.MiniClusterExtension; +import org.apache.flink.runtime.testutils.InternalMiniClusterExtension; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.junit5.InjectMiniCluster; import org.apache.flink.testutils.TestingUtils; import org.apache.flink.types.LongValue; import org.apache.flink.util.TestLoggerExtension; @@ -71,16 +71,13 @@ public class TaskCancelAsyncProducerConsumerITCase { private static volatile Thread ASYNC_PRODUCER_THREAD; private static volatile Thread ASYNC_CONSUMER_THREAD; - public static final MiniClusterExtension MINI_CLUSTER_RESOURCE = - new MiniClusterExtension( + @RegisterExtension + private static final InternalMiniClusterExtension MINI_CLUSTER_RESOURCE = + new InternalMiniClusterExtension( new MiniClusterResourceConfiguration.Builder() .setConfiguration(getFlinkConfiguration()) .build()); - @RegisterExtension - public static AllCallbackWrapper allCallbackWrapper = - new AllCallbackWrapper(MINI_CLUSTER_RESOURCE); - private static Configuration getFlinkConfiguration() { Configuration config = new Configuration(); config.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse("4096")); @@ -97,7 +94,8 @@ private static Configuration getFlinkConfiguration() { * the main task Thread. */ @Test - public void testCancelAsyncProducerAndConsumer() throws Exception { + public void testCancelAsyncProducerAndConsumer(@InjectMiniCluster MiniCluster flink) + throws Exception { Deadline deadline = Deadline.now().plus(Duration.ofMinutes(2)); // Job with async producer and consumer @@ -117,8 +115,6 @@ public void testCancelAsyncProducerAndConsumer() throws Exception { JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(producer, consumer); - final MiniCluster flink = MINI_CLUSTER_RESOURCE.getMiniCluster(); - // Submit job and wait until running flink.runDetached(jobGraph); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InternalMiniClusterExtension.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InternalMiniClusterExtension.java new file mode 100644 index 0000000000000..5a89834833a4a --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InternalMiniClusterExtension.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testutils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.test.junit5.InjectClusterClientConfiguration; +import org.apache.flink.test.junit5.InjectClusterRESTAddress; +import org.apache.flink.test.junit5.InjectMiniCluster; + +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.ParameterContext; +import org.junit.jupiter.api.extension.ParameterResolutionException; +import org.junit.jupiter.api.extension.ParameterResolver; + +import java.net.URI; + +/** + * An extension which starts a {@link MiniCluster} for testing purposes. + * + *

This should only be used by tests within the flink-runtime module. Other modules should use + * {@code MiniClusterExtension} provided by flink-test-utils module. + */ +@Internal +public class InternalMiniClusterExtension + implements BeforeAllCallback, AfterAllCallback, ParameterResolver { + + private final MiniClusterResource miniClusterResource; + + public InternalMiniClusterExtension( + final MiniClusterResourceConfiguration miniClusterResourceConfiguration) { + this.miniClusterResource = new MiniClusterResource(miniClusterResourceConfiguration); + } + + public int getNumberSlots() { + return miniClusterResource.getNumberSlots(); + } + + public MiniCluster getMiniCluster() { + return miniClusterResource.getMiniCluster(); + } + + public UnmodifiableConfiguration getClientConfiguration() { + return miniClusterResource.getClientConfiguration(); + } + + public URI getRestAddres() { + return miniClusterResource.getRestAddres(); + } + + @Override + public void beforeAll(ExtensionContext context) throws Exception { + miniClusterResource.before(); + } + + @Override + public void afterAll(ExtensionContext context) throws Exception { + miniClusterResource.after(); + } + + @Override + public boolean supportsParameter( + ParameterContext parameterContext, ExtensionContext extensionContext) + throws ParameterResolutionException { + Class parameterType = parameterContext.getParameter().getType(); + if (parameterContext.isAnnotated(InjectMiniCluster.class) + && parameterType.isAssignableFrom(MiniCluster.class)) { + return true; + } + if (parameterContext.isAnnotated(InjectClusterClientConfiguration.class) + && parameterType.isAssignableFrom(UnmodifiableConfiguration.class)) { + return true; + } + return parameterContext.isAnnotated(InjectClusterRESTAddress.class) + && parameterType.isAssignableFrom(URI.class); + } + + @Override + public Object resolveParameter( + ParameterContext parameterContext, ExtensionContext extensionContext) + throws ParameterResolutionException { + if (parameterContext.isAnnotated(InjectMiniCluster.class)) { + return miniClusterResource.getMiniCluster(); + } + if (parameterContext.isAnnotated(InjectClusterClientConfiguration.class)) { + return miniClusterResource.getClientConfiguration(); + } + if (parameterContext.isAnnotated(InjectClusterRESTAddress.class)) { + return miniClusterResource.getRestAddres(); + } + throw new ParameterResolutionException("Unsupported parameter"); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterExtension.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterExtension.java deleted file mode 100644 index 0e1115cc42bd8..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterExtension.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.testutils; - -import org.apache.flink.annotation.Experimental; -import org.apache.flink.configuration.UnmodifiableConfiguration; -import org.apache.flink.core.testutils.CustomExtension; -import org.apache.flink.runtime.minicluster.MiniCluster; - -import org.junit.jupiter.api.extension.ExtensionContext; - -import javax.annotation.Nullable; - -import java.net.URI; -import java.util.function.Supplier; - -/** An extension which starts a {@link MiniCluster} for testing purposes. */ -public class MiniClusterExtension implements CustomExtension { - - private final Supplier - miniClusterResourceConfigurationSupplier; - @Nullable private MiniClusterResource miniClusterResource; - - public MiniClusterExtension( - final MiniClusterResourceConfiguration miniClusterResourceConfiguration) { - this(() -> miniClusterResourceConfiguration); - } - - @Experimental - public MiniClusterExtension( - final Supplier - miniClusterResourceConfigurationSupplier) { - this.miniClusterResourceConfigurationSupplier = miniClusterResourceConfigurationSupplier; - } - - public int getNumberSlots() { - return miniClusterResource.getNumberSlots(); - } - - public MiniCluster getMiniCluster() { - return miniClusterResource.getMiniCluster(); - } - - public UnmodifiableConfiguration getClientConfiguration() { - return miniClusterResource.getClientConfiguration(); - } - - public URI getRestAddres() { - return miniClusterResource.getRestAddres(); - } - - @Override - public void before(ExtensionContext context) throws Exception { - this.miniClusterResource = - new MiniClusterResource(miniClusterResourceConfigurationSupplier.get()); - miniClusterResource.before(); - } - - @Override - public void after(ExtensionContext context) throws Exception { - miniClusterResource.after(); - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/test/junit5/InjectClusterClientConfiguration.java b/flink-runtime/src/test/java/org/apache/flink/test/junit5/InjectClusterClientConfiguration.java new file mode 100644 index 0000000000000..eb788bedae048 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/test/junit5/InjectClusterClientConfiguration.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.junit5; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.configuration.UnmodifiableConfiguration; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotate a test method parameter with this annotation to inject the {@link + * UnmodifiableConfiguration} for building a cluster client. + * + * @see org.apache.flink.test.junit5.MiniClusterExtension + */ +@Target(ElementType.PARAMETER) +@Retention(RetentionPolicy.RUNTIME) +@Experimental +public @interface InjectClusterClientConfiguration {} diff --git a/flink-runtime/src/test/java/org/apache/flink/test/junit5/InjectClusterRESTAddress.java b/flink-runtime/src/test/java/org/apache/flink/test/junit5/InjectClusterRESTAddress.java new file mode 100644 index 0000000000000..3afafcb9eda37 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/test/junit5/InjectClusterRESTAddress.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.junit5; + +import org.apache.flink.annotation.Experimental; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.net.URI; + +/** + * Annotate a test method parameter with this annotation to inject the {@link URI} REST address of + * the cluster. + * + * @see org.apache.flink.test.junit5.MiniClusterExtension + */ +@Target(ElementType.PARAMETER) +@Retention(RetentionPolicy.RUNTIME) +@Experimental +public @interface InjectClusterRESTAddress {} diff --git a/flink-runtime/src/test/java/org/apache/flink/test/junit5/InjectMiniCluster.java b/flink-runtime/src/test/java/org/apache/flink/test/junit5/InjectMiniCluster.java new file mode 100644 index 0000000000000..e0f3a1af4757d --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/test/junit5/InjectMiniCluster.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.junit5; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.runtime.minicluster.MiniCluster; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotate a test method parameter with this annotation to inject the {@link MiniCluster} instance. + * + * @see org.apache.flink.test.junit5.MiniClusterExtension + */ +@Target(ElementType.PARAMETER) +@Retention(RetentionPolicy.RUNTIME) +@Experimental +public @interface InjectMiniCluster {} diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/InjectClusterClient.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/InjectClusterClient.java new file mode 100644 index 0000000000000..abc0adb11b48a --- /dev/null +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/InjectClusterClient.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.junit5; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.rest.RestClusterClient; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotate a test method parameter with this annotation to inject the {@link ClusterClient} or the + * {@link RestClusterClient} instance. + * + * @see MiniClusterExtension + */ +@Target(ElementType.PARAMETER) +@Retention(RetentionPolicy.RUNTIME) +@Experimental +public @interface InjectClusterClient {} diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java new file mode 100644 index 0000000000000..4f880be56e06d --- /dev/null +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.junit5; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.MiniClusterClient; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.testutils.InternalMiniClusterExtension; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.TestStreamEnvironment; +import org.apache.flink.test.util.TestEnvironment; + +import org.junit.jupiter.api.TestFactory; +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.ParameterContext; +import org.junit.jupiter.api.extension.ParameterResolutionException; +import org.junit.jupiter.api.extension.ParameterResolver; +import org.junit.jupiter.params.ParameterizedTest; + +import java.util.function.Supplier; + +/** + * Starts a Flink {@link MiniCluster} and registers the respective {@link ExecutionEnvironment} and + * {@link StreamExecutionEnvironment} in the correct thread local environment. + * + *

Example usage: + * + *

{@code
+ * @ExtendWith(MiniClusterExtension.class)
+ * class MyTest {
+ *      @Test
+ *      public void myTest() {
+ *          ExecutionEnvironment execEnv = ExecutionEnvironment.getExecutionEnvironment();
+ *      }
+ * }
+ * }
+ * + *

Or to tune the {@link MiniCluster} parameters: + * + *

{@code
+ * class MyTest {
+ *
+ *     @RegisterExtension
+ *     public static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+ *             new MiniClusterExtension(
+ *                     new MiniClusterResourceConfiguration.Builder()
+ *                             .setNumberTaskManagers(1)
+ *                             .setConfiguration(new Configuration())
+ *                             .build());
+ *
+ *     @Test
+ *     public void myTest() {
+ *          ExecutionEnvironment execEnv = ExecutionEnvironment.getExecutionEnvironment();
+ *     }
+ * }
+ * }
+ * + *

You can use parameter injection with the annotations {@link InjectMiniCluster}, {@link + * InjectClusterClient}, {@link InjectClusterRESTAddress}, {@link InjectClusterClientConfiguration}: + * + *

{@code
+ * class MyTest {
+ *
+ *     @RegisterExtension
+ *     public static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+ *             new MiniClusterExtension(
+ *                     new MiniClusterResourceConfiguration.Builder()
+ *                             .setNumberTaskManagers(1)
+ *                             .setConfiguration(new Configuration())
+ *                             .build());
+ *
+ *     @Test
+ *     public void myTest(@InjectMiniCluster MiniCluster miniCluster) {
+ *          // Use miniCluster
+ *     }
+ *
+ *     @Test
+ *     public void myTest(@InjectClusterClient ClusterClient clusterClient) {
+ *          // clusterClient here is an instance of MiniClusterClient
+ *     }
+ *
+ *     @Test
+ *     public void myTest(@InjectClusterClient RestClusterClient restClusterClient) {
+ *          // Using RestClusterClient as parameter type will force the creation of a RestClusterClient,
+ *          //  rather than MiniClusterClient
+ *     }
+ * }
+ * }
+ * + *

You can use it both with programmatic and declarative extension annotations. Check some limitations. Use {@link + * ParameterizedTest} instead. + */ +@Experimental +public final class MiniClusterExtension + implements BeforeAllCallback, + BeforeEachCallback, + AfterEachCallback, + AfterAllCallback, + ParameterResolver { + + private static final ExtensionContext.Namespace NAMESPACE = + ExtensionContext.Namespace.create(MiniClusterExtension.class); + + private static final String CLUSTER_REST_CLIENT = "clusterRestClient"; + private static final String MINI_CLUSTER_CLIENT = "miniClusterClient"; + + private final Supplier + miniClusterResourceConfigurationSupplier; + + private InternalMiniClusterExtension internalMiniClusterExtension; + + public MiniClusterExtension() { + this( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(1) + .build()); + } + + public MiniClusterExtension( + final MiniClusterResourceConfiguration miniClusterResourceConfiguration) { + this(() -> miniClusterResourceConfiguration); + } + + @Experimental + public MiniClusterExtension( + Supplier miniClusterResourceConfigurationSupplier) { + this.miniClusterResourceConfigurationSupplier = miniClusterResourceConfigurationSupplier; + } + + // Accessors + + @Override + public boolean supportsParameter( + ParameterContext parameterContext, ExtensionContext extensionContext) + throws ParameterResolutionException { + Class parameterType = parameterContext.getParameter().getType(); + if (parameterContext.isAnnotated(InjectClusterClient.class) + && parameterType.isAssignableFrom(ClusterClient.class)) { + return true; + } + return internalMiniClusterExtension.supportsParameter(parameterContext, extensionContext); + } + + @Override + public Object resolveParameter( + ParameterContext parameterContext, ExtensionContext extensionContext) + throws ParameterResolutionException { + Class parameterType = parameterContext.getParameter().getType(); + if (parameterContext.isAnnotated(InjectClusterClient.class)) { + if (parameterType.equals(RestClusterClient.class)) { + return extensionContext + .getStore(NAMESPACE) + .getOrComputeIfAbsent( + CLUSTER_REST_CLIENT, + k -> { + try { + return new CloseableParameter<>( + createRestClusterClient( + internalMiniClusterExtension)); + } catch (Exception e) { + throw new ParameterResolutionException( + "Cannot create rest cluster client", e); + } + }, + CloseableParameter.class) + .get(); + } + // Default to MiniClusterClient + return extensionContext + .getStore(NAMESPACE) + .getOrComputeIfAbsent( + MINI_CLUSTER_CLIENT, + k -> { + try { + return new CloseableParameter<>( + createMiniClusterClient(internalMiniClusterExtension)); + } catch (Exception e) { + throw new ParameterResolutionException( + "Cannot create mini cluster client", e); + } + }, + CloseableParameter.class) + .get(); + } + return internalMiniClusterExtension.resolveParameter(parameterContext, extensionContext); + } + + // Lifecycle implementation + + @Override + public void beforeAll(ExtensionContext context) throws Exception { + internalMiniClusterExtension = + new InternalMiniClusterExtension(miniClusterResourceConfigurationSupplier.get()); + internalMiniClusterExtension.beforeAll(context); + } + + @Override + public void beforeEach(ExtensionContext context) throws Exception { + registerEnv(internalMiniClusterExtension); + } + + @Override + public void afterEach(ExtensionContext context) throws Exception { + unregisterEnv(internalMiniClusterExtension); + } + + @Override + public void afterAll(ExtensionContext context) throws Exception { + internalMiniClusterExtension.afterAll(context); + } + + // Implementation + + private void registerEnv(InternalMiniClusterExtension internalMiniClusterExtension) { + TestEnvironment executionEnvironment = + new TestEnvironment( + internalMiniClusterExtension.getMiniCluster(), + internalMiniClusterExtension.getNumberSlots(), + false); + executionEnvironment.setAsContext(); + TestStreamEnvironment.setAsContext( + internalMiniClusterExtension.getMiniCluster(), + internalMiniClusterExtension.getNumberSlots()); + } + + private void unregisterEnv(InternalMiniClusterExtension internalMiniClusterExtension) { + TestStreamEnvironment.unsetAsContext(); + TestEnvironment.unsetAsContext(); + } + + private MiniClusterClient createMiniClusterClient( + InternalMiniClusterExtension internalMiniClusterExtension) { + return new MiniClusterClient( + internalMiniClusterExtension.getClientConfiguration(), + internalMiniClusterExtension.getMiniCluster()); + } + + private RestClusterClient createRestClusterClient( + InternalMiniClusterExtension internalMiniClusterExtension) throws Exception { + return new RestClusterClient<>( + internalMiniClusterExtension.getClientConfiguration(), + MiniClusterClient.MiniClusterId.INSTANCE); + } + + private static class CloseableParameter + implements ExtensionContext.Store.CloseableResource { + private final T autoCloseable; + + CloseableParameter(T autoCloseable) { + this.autoCloseable = autoCloseable; + } + + public T get() { + return autoCloseable; + } + + @Override + public void close() throws Throwable { + this.autoCloseable.close(); + } + } +} diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientExtension.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientExtension.java deleted file mode 100644 index 28650f767013a..0000000000000 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientExtension.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.util; - -import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.client.program.MiniClusterClient; -import org.apache.flink.client.program.rest.RestClusterClient; -import org.apache.flink.runtime.testutils.MiniClusterExtension; -import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; -import org.apache.flink.streaming.util.TestStreamEnvironment; -import org.apache.flink.util.ExceptionUtils; - -import org.junit.jupiter.api.extension.ExtensionContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Starts a Flink mini cluster as a resource and registers the respective ExecutionEnvironment and - * StreamExecutionEnvironment. - */ -public class MiniClusterWithClientExtension extends MiniClusterExtension { - private static final Logger LOG = LoggerFactory.getLogger(MiniClusterWithClientExtension.class); - - private ClusterClient clusterClient; - private RestClusterClient restClusterClient; - - private TestEnvironment executionEnvironment; - - public MiniClusterWithClientExtension( - final MiniClusterResourceConfiguration miniClusterResourceConfiguration) { - super(miniClusterResourceConfiguration); - } - - public ClusterClient getClusterClient() { - return clusterClient; - } - - /** - * Returns a {@link RestClusterClient} that can be used to communicate with this mini cluster. - * Only use this if the client returned via {@link #getClusterClient()} does not fulfill your - * needs. - */ - public RestClusterClient getRestClusterClient() throws Exception { - return restClusterClient; - } - - public TestEnvironment getTestEnvironment() { - return executionEnvironment; - } - - @Override - public void before(ExtensionContext context) throws Exception { - super.before(context); - - clusterClient = createMiniClusterClient(); - restClusterClient = createRestClusterClient(); - - executionEnvironment = new TestEnvironment(getMiniCluster(), getNumberSlots(), false); - executionEnvironment.setAsContext(); - TestStreamEnvironment.setAsContext(getMiniCluster(), getNumberSlots()); - } - - @Override - public void after(ExtensionContext context) throws Exception { - LOG.info("Finalization triggered: Cluster shutdown is going to be initiated."); - TestStreamEnvironment.unsetAsContext(); - TestEnvironment.unsetAsContext(); - - Exception exception = null; - - if (clusterClient != null) { - try { - clusterClient.close(); - } catch (Exception e) { - exception = e; - } - } - - clusterClient = null; - - if (restClusterClient != null) { - try { - restClusterClient.close(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - } - - restClusterClient = null; - - super.after(context); - - if (exception != null) { - LOG.warn("Could not properly shut down the MiniClusterWithClientResource.", exception); - } - } - - private MiniClusterClient createMiniClusterClient() { - return new MiniClusterClient(getClientConfiguration(), getMiniCluster()); - } - - private RestClusterClient createRestClusterClient() - throws Exception { - return new RestClusterClient<>( - getClientConfiguration(), MiniClusterClient.MiniClusterId.INSTANCE); - } -}