diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java index dc6004df928dc..23876c48264cd 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java @@ -18,24 +18,30 @@ package org.apache.flink.streaming.util; -import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory; import org.apache.flink.test.util.MiniClusterPipelineExecutorServiceLoader; -import org.apache.flink.util.TestNameProvider; import java.net.URL; import java.time.Duration; import java.util.Collection; import java.util.Collections; +import static org.apache.flink.runtime.testutils.PseudoRandomValueSelector.randomize; + /** A {@link StreamExecutionEnvironment} that executes its jobs on {@link MiniCluster}. */ public class TestStreamEnvironment extends StreamExecutionEnvironment { + private static final String STATE_CHANGE_LOG_CONFIG_ON = "on"; + private static final String STATE_CHANGE_LOG_CONFIG_UNSET = "unset"; + private static final String STATE_CHANGE_LOG_CONFIG_RAND = "random"; private static final boolean RANDOMIZE_CHECKPOINTING_CONFIG = Boolean.parseBoolean(System.getProperty("checkpointing.randomization", "false")); + private static final String STATE_CHANGE_LOG_CONFIG = + System.getProperty("checkpointing.changelog", STATE_CHANGE_LOG_CONFIG_UNSET).trim(); public TestStreamEnvironment( MiniCluster miniCluster, @@ -75,7 +81,22 @@ public static void setAsContext( TestStreamEnvironment env = new TestStreamEnvironment( miniCluster, parallelism, jarFiles, classpaths); - randomize(conf); + if (RANDOMIZE_CHECKPOINTING_CONFIG) { + randomize( + conf, ExecutionCheckpointingOptions.ENABLE_UNALIGNED, true, false); + randomize( + conf, + ExecutionCheckpointingOptions.ALIGNMENT_TIMEOUT, + Duration.ofSeconds(0), + Duration.ofMillis(100), + Duration.ofSeconds(2)); + } + if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_ON)) { + conf.set(CheckpointingOptions.ENABLE_STATE_CHANGE_LOG, true); + } else if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase( + STATE_CHANGE_LOG_CONFIG_RAND)) { + randomize(conf, CheckpointingOptions.ENABLE_STATE_CHANGE_LOG, true, false); + } env.configure(conf, env.getUserClassloader()); return env; }; @@ -83,28 +104,6 @@ public static void setAsContext( initializeContextEnvironment(factory); } - /** - * Randomizes configuration on test case level even if mini cluster is used in a class rule. - * - *
Note that only unset properties are randomized.
- *
- * @param conf the configuration to randomize
- */
- private static void randomize(Configuration conf) {
- if (RANDOMIZE_CHECKPOINTING_CONFIG) {
- final String testName = TestNameProvider.getCurrentTestName();
- final PseudoRandomValueSelector valueSelector =
- PseudoRandomValueSelector.create(testName != null ? testName : "unknown");
- valueSelector.select(conf, ExecutionCheckpointingOptions.ENABLE_UNALIGNED, true, false);
- valueSelector.select(
- conf,
- ExecutionCheckpointingOptions.ALIGNMENT_TIMEOUT,
- Duration.ofSeconds(0),
- Duration.ofMillis(100),
- Duration.ofSeconds(2));
- }
- }
-
/**
* Sets the streaming context environment to a TestStreamEnvironment that runs its programs on
* the given cluster with the given default parallelism.
diff --git a/pom.xml b/pom.xml
index 619b4d546d65b..c48cc4fbe2cf6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1587,6 +1587,8 @@ under the License.