Skip to content

Commit

Permalink
[hotfix][tests] Restructure AbstractHaJobRunITCase
Browse files Browse the repository at this point in the history
Refactors the test to make use of a static MiniClusterExtension, in preperation FLINK-26252.
  • Loading branch information
zentol committed Feb 24, 2022
1 parent 881d4c6 commit 576354c
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 40 deletions.
6 changes: 6 additions & 0 deletions flink-filesystems/flink-s3-fs-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,12 @@ under the License.
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,25 @@

import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.testutils.AllCallbackWrapper;
import org.apache.flink.core.testutils.EachCallbackWrapper;
import org.apache.flink.core.testutils.TestContainerExtension;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.highavailability.AbstractHAJobRunITCase;
import org.apache.flink.runtime.highavailability.FileSystemJobResultStore;
import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterExtension;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;

import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;

import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.time.Duration;
Expand All @@ -51,10 +58,29 @@ public abstract class HAJobRunOnMinioS3StoreITCase extends AbstractHAJobRunITCas
private static final String JOB_RESULT_STORE_FOLDER = "jrs";

@RegisterExtension
@Order(2)
private static final AllCallbackWrapper<TestContainerExtension<MinioTestContainer>>
MINIO_EXTENSION =
new AllCallbackWrapper<>(new TestContainerExtension<>(MinioTestContainer::new));

@RegisterExtension
@Order(3)
private static final EachCallbackWrapper<MiniClusterExtension> miniClusterExtension =
new EachCallbackWrapper<>(
new MiniClusterExtension(
() -> {
final Configuration configuration = createConfiguration();
FileSystem.initialize(configuration, null);
return new MiniClusterResourceConfiguration.Builder()
.setConfiguration(configuration)
.build();
}));

@Override
public MiniCluster getMiniCluster() {
return miniClusterExtension.getCustomExtension().getMiniCluster();
}

private static MinioTestContainer getMinioContainer() {
return MINIO_EXTENSION.getCustomExtension().getTestContainer();
}
Expand All @@ -77,13 +103,7 @@ private static String createSubPath(String... subfolders) {
return pathSeparator + StringUtils.join(subfolders, pathSeparator);
}

@Override
protected String getHAStoragePath() {
return createS3URIWithSubPath(CLUSTER_ID);
}

@Override
protected Configuration createConfiguration() {
private static Configuration createConfiguration() {
final Configuration config = new Configuration();

getMinioContainer().setS3ConfigOptions(config);
Expand All @@ -94,7 +114,12 @@ protected Configuration createConfiguration() {
JobResultStoreOptions.STORAGE_PATH,
createS3URIWithSubPath(CLUSTER_ID, JOB_RESULT_STORE_FOLDER));

return config;
return addHaConfiguration(config, createS3URIWithSubPath(CLUSTER_ID));
}

@AfterAll
public static void unsetFileSystem() {
FileSystem.initialize(new Configuration(), null);
}

@Override
Expand Down
6 changes: 6 additions & 0 deletions flink-filesystems/flink-s3-fs-hadoop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
Expand Down
6 changes: 6 additions & 0 deletions flink-filesystems/flink-s3-fs-presto/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.curator</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,15 @@
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.testutils.AllCallbackWrapper;
import org.apache.flink.core.testutils.EachCallbackWrapper;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testutils.MiniClusterExtension;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.concurrent.FutureUtils;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
Expand All @@ -54,25 +51,17 @@
public abstract class AbstractHAJobRunITCase {

@RegisterExtension
@Order(1)
private static final AllCallbackWrapper<ZooKeeperExtension> ZOOKEEPER_EXTENSION =
new AllCallbackWrapper<>(new ZooKeeperExtension());

@RegisterExtension
private final EachCallbackWrapper<MiniClusterExtension> miniClusterExtension =
new EachCallbackWrapper<>(
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getFlinkConfiguration())
.build()));

private Configuration getFlinkConfiguration() {
final Configuration config = createConfiguration();

protected static Configuration addHaConfiguration(
final Configuration config, final String haStoragePath) {
config.set(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.set(
HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
ZOOKEEPER_EXTENSION.getCustomExtension().getConnectString());
config.set(HighAvailabilityOptions.HA_STORAGE_PATH, getHAStoragePath());
config.set(HighAvailabilityOptions.HA_STORAGE_PATH, haStoragePath);

// getFlinkConfiguration() is called on each new instantiation of the MiniCluster which is
// happening before each test run
Expand All @@ -81,26 +70,13 @@ private Configuration getFlinkConfiguration() {
return config;
}

@AfterEach
public void unsetFileSystem() {
FileSystem.initialize(new Configuration(), null);
}

/**
* Should return the path to the HA storage which will be injected into the Flink configuration.
*
* @see HighAvailabilityOptions#HA_STORAGE_PATH
*/
protected abstract String getHAStoragePath();

/** Initializes the {@link Configuration} used for the Flink cluster. */
protected abstract Configuration createConfiguration();

protected void runAfterJobTermination() throws Exception {}

protected abstract MiniCluster getMiniCluster();

@Test
public void testJobExecutionInHaMode() throws Exception {
final MiniCluster flinkCluster = miniClusterExtension.getCustomExtension().getMiniCluster();
final MiniCluster flinkCluster = getMiniCluster();

final JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph();

Expand Down

0 comments on commit 576354c

Please sign in to comment.