Skip to content

Commit

Permalink
[hotfix] Refactor YARNFileReplicationITCase to avoid inheritance
Browse files Browse the repository at this point in the history
  • Loading branch information
tisonkun committed Feb 20, 2020
1 parent 9d91aac commit 110b99d
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,18 @@

package org.apache.flink.yarn;

import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.util.YarnTestUtils;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -29,12 +39,24 @@
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.File;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;

import static org.apache.flink.yarn.configuration.YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;

/**
* Test cases for the deployment of Yarn Flink clusters with customized file replication numbers.
*/
public class YARNFileReplicationITCase extends YARNITCase {
public class YARNFileReplicationITCase extends YarnTestBase {

private static final Duration yarnAppTerminateTimeout = Duration.ofSeconds(10);
private static final int sleepIntervalInMS = 100;

@BeforeClass
public static void setup() {
Expand All @@ -44,25 +66,78 @@ public static void setup() {

@Test
public void testPerJobModeWithCustomizedFileReplication() throws Exception {
Configuration configuration = createDefaultConfiguration(YarnConfigOptions.UserJarInclusion.DISABLED);
final Configuration configuration = getDefaultConfiguration();
configuration.setInteger(YarnConfigOptions.FILE_REPLICATION, 4);

runTest(() -> deployPerjob(
configuration,
getTestingJobGraph()));
runTest(() -> deployPerJob(configuration, getTestingJobGraph()));
}

@Test
public void testPerJobModeWithDefaultFileReplication() throws Exception {
Configuration configuration = createDefaultConfiguration(YarnConfigOptions.UserJarInclusion.DISABLED);
runTest(() -> deployPerJob(getDefaultConfiguration(), getTestingJobGraph()));
}

private void deployPerJob(Configuration configuration, JobGraph jobGraph) throws Exception {
try (final YarnClusterDescriptor yarnClusterDescriptor = createYarnClusterDescriptor(configuration)) {

yarnClusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkShadedHadoopDir.listFiles()));

final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(768)
.setTaskManagerMemoryMB(1024)
.setSlotsPerTaskManager(1)
.createClusterSpecification();

File testingJar = YarnTestBase.findFile("..", new YarnTestUtils.TestJarFinder("flink-yarn-tests"));

jobGraph.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI()));
try (ClusterClient<ApplicationId> clusterClient = yarnClusterDescriptor
.deployJobCluster(
clusterSpecification,
jobGraph,
false)
.getClusterClient()) {

ApplicationId applicationId = clusterClient.getClusterId();

final CompletableFuture<JobResult> jobResultCompletableFuture = clusterClient.requestJobResult(jobGraph.getJobID());

final JobResult jobResult = jobResultCompletableFuture.get();

assertThat(jobResult, is(notNullValue()));
assertThat(jobResult.getSerializedThrowable().isPresent(), is(false));

extraVerification(configuration, applicationId);

waitApplicationFinishedElseKillIt(
applicationId, yarnAppTerminateTimeout, yarnClusterDescriptor, sleepIntervalInMS);
}
}
}

private JobGraph getTestingJobGraph() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);

env.addSource(new NoDataSource())
.shuffle()
.addSink(new DiscardingSink<>());

return env.getStreamGraph().getJobGraph();
}

private Configuration getDefaultConfiguration() {
final Configuration configuration = new Configuration();
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1g"));
configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s");
configuration.setString(CLASSPATH_INCLUDE_USER_JAR, YarnConfigOptions.UserJarInclusion.DISABLED.toString());

runTest(() -> deployPerjob(
configuration,
getTestingJobGraph()));
return configuration;
}

@Override
protected void extraVerification(Configuration configuration, ApplicationId applicationId) throws Exception {
private void extraVerification(Configuration configuration, ApplicationId applicationId) throws Exception {
final FileSystem fs = FileSystem.get(getYarnConfiguration());

String suffix = ".flink/" + applicationId.toString() + "/" + flinkUberjar.getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,8 @@
*/
public class YARNITCase extends YarnTestBase {

private final Duration yarnAppTerminateTimeout = Duration.ofSeconds(10);

private final int sleepIntervalInMS = 100;
private static final Duration yarnAppTerminateTimeout = Duration.ofSeconds(10);
private static final int sleepIntervalInMS = 100;

@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
Expand All @@ -69,27 +68,26 @@ public static void setup() {

@Test
public void testPerJobModeWithEnableSystemClassPathIncludeUserJar() throws Exception {
runTest(() -> deployPerjob(
runTest(() -> deployPerJob(
createDefaultConfiguration(YarnConfigOptions.UserJarInclusion.FIRST),
getTestingJobGraph()));
}

@Test
public void testPerJobModeWithDisableSystemClassPathIncludeUserJar() throws Exception {
runTest(() -> deployPerjob(
runTest(() -> deployPerJob(
createDefaultConfiguration(YarnConfigOptions.UserJarInclusion.DISABLED),
getTestingJobGraph()));
}

@Test
public void testPerJobModeWithDistributedCache() throws Exception {
runTest(() -> deployPerjob(
runTest(() -> deployPerJob(
createDefaultConfiguration(YarnConfigOptions.UserJarInclusion.DISABLED),
YarnTestCacheJob.getDistributedCacheJobGraph(tmp.newFolder())));
}

protected void deployPerjob(Configuration configuration, JobGraph jobGraph) throws Exception {

private void deployPerJob(Configuration configuration, JobGraph jobGraph) throws Exception {
try (final YarnClusterDescriptor yarnClusterDescriptor = createYarnClusterDescriptor(configuration)) {

yarnClusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
Expand Down Expand Up @@ -121,15 +119,13 @@ protected void deployPerjob(Configuration configuration, JobGraph jobGraph) thro
assertThat(jobResult, is(notNullValue()));
assertThat(jobResult.getSerializedThrowable().isPresent(), is(false));

extraVerification(configuration, applicationId);

waitApplicationFinishedElseKillIt(
applicationId, yarnAppTerminateTimeout, yarnClusterDescriptor, sleepIntervalInMS);
}
}
}

protected JobGraph getTestingJobGraph() {
private JobGraph getTestingJobGraph() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);

Expand All @@ -140,14 +136,12 @@ protected JobGraph getTestingJobGraph() {
return env.getStreamGraph().getJobGraph();
}

protected Configuration createDefaultConfiguration(YarnConfigOptions.UserJarInclusion userJarInclusion) {
private Configuration createDefaultConfiguration(YarnConfigOptions.UserJarInclusion userJarInclusion) {
Configuration configuration = new Configuration();
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1g"));
configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s");
configuration.setString(CLASSPATH_INCLUDE_USER_JAR, userJarInclusion.toString());

return configuration;
}

protected void extraVerification(Configuration configuration, ApplicationId applicationId) throws Exception { }
}

0 comments on commit 110b99d

Please sign in to comment.