Skip to content

Commit

Permalink
[FLINK-26496][yarn][tests] Migrate tests to JUnit5
Browse files Browse the repository at this point in the history
  • Loading branch information
RocMarshal authored May 4, 2022
1 parent eee2ab7 commit b98c66c
Show file tree
Hide file tree
Showing 15 changed files with 409 additions and 474 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.minikdc.MiniKdc;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -96,11 +95,10 @@ public class SecureTestEnvironment {

private static String testPrincipal = null;

public static void prepare(TemporaryFolder tempFolder, String... additionalPrincipals) {
private static void doPrepare(File baseDirForSecureRun, String... additionalPrincipals) {
checkArgument(additionalPrincipals != null, "Valid principals must be provided");

try {
File baseDirForSecureRun = tempFolder.newFolder();
LOG.info("Base Directory for Secure Environment: {}", baseDirForSecureRun);

Properties kdcConf = MiniKdc.createConf();
Expand Down Expand Up @@ -168,6 +166,10 @@ public static void prepare(TemporaryFolder tempFolder, String... additionalPrinc
}
}

public static void prepare(File tempFolder, String... additionalPrincipals) {
doPrepare(tempFolder, additionalPrincipals);
}

public static void cleanup() {

LOG.info("Cleaning up Secure Environment");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.yarn;

import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.client.cli.CliFrontendTestBase;
import org.apache.flink.client.cli.CliFrontendTestUtils;
import org.apache.flink.client.deployment.ClusterClientServiceLoader;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
Expand All @@ -29,11 +28,12 @@
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.File;

import static org.apache.flink.client.cli.CliFrontendRunTest.verifyCliFrontend;
import static org.apache.flink.yarn.util.TestUtils.getTestJarPath;
Expand All @@ -43,22 +43,20 @@
*
* @see org.apache.flink.client.cli.CliFrontendRunTest
*/
public class CliFrontendRunWithYarnTest extends CliFrontendTestBase {

@Rule public TemporaryFolder tmp = new TemporaryFolder();
class CliFrontendRunWithYarnTest {

@BeforeClass
public static void init() {
@BeforeAll
static void init() {
CliFrontendTestUtils.pipeSystemOutToNull();
}

@AfterClass
public static void shutdown() {
@AfterAll
static void shutdown() {
CliFrontendTestUtils.restoreSystemOut();
}

@Test
public void testRun() throws Exception {
void testRun(@TempDir File tempDir) throws Exception {
String testJarPath = getTestJarPath("BatchWordCount.jar").getAbsolutePath();

Configuration configuration = new Configuration();
Expand All @@ -73,7 +71,7 @@ public void testRun() throws Exception {
new FlinkYarnSessionCli(
configuration,
testServiceLoader,
tmp.getRoot().getAbsolutePath(),
tempDir.getAbsolutePath(),
"y",
"yarn",
true);
Expand Down
43 changes: 17 additions & 26 deletions flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.yarn.configuration.YarnResourceManagerDriverConfiguration;
import org.apache.flink.yarn.util.TestUtils;

Expand All @@ -39,52 +38,43 @@
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for various utilities. */
public class UtilsTest extends TestLogger {
class UtilsTest {
private static final Logger LOG = LoggerFactory.getLogger(UtilsTest.class);

@Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
@TempDir File temporaryFolder;

@Test
public void testUberjarLocator() {
void testUberjarLocator() {
File dir = TestUtils.findFile("..", new TestUtils.RootDirFilenameFilter());
Assert.assertNotNull(dir);
Assert.assertTrue(dir.getName().endsWith(".jar"));
assertThat(dir).isNotNull();
assertThat(dir.getName()).endsWith(".jar");
dir = dir.getParentFile().getParentFile(); // from uberjar to lib to root
Assert.assertTrue(dir.exists());
Assert.assertTrue(dir.isDirectory());
List<String> files = Arrays.asList(dir.list());
Assert.assertTrue(files.contains("lib"));
Assert.assertTrue(files.contains("bin"));
Assert.assertTrue(files.contains("conf"));
assertThat(dir).exists().isDirectory();
assertThat(dir.list()).contains("lib", "bin", "conf");
}

@Test
public void testCreateTaskExecutorCredentials() throws Exception {
File root = temporaryFolder.getRoot();
void testCreateTaskExecutorCredentials() throws Exception {
File root = temporaryFolder;
File home = new File(root, "home");
boolean created = home.mkdir();
assertTrue(created);
assertThat(created).isTrue();

Configuration flinkConf = new Configuration();
YarnConfiguration yarnConf = new YarnConfiguration();
Expand Down Expand Up @@ -112,7 +102,8 @@ public void testCreateTaskExecutorCredentials() throws Exception {
final YarnResourceManagerDriverConfiguration yarnResourceManagerDriverConfiguration =
new YarnResourceManagerDriverConfiguration(env, "localhost", null);

File credentialFile = temporaryFolder.newFile("container_tokens");
File credentialFile = temporaryFolder.toPath().resolve("container_tokens").toFile();
credentialFile.createNewFile();
final Text amRmTokenKind = AMRMTokenIdentifier.KIND_NAME;
final Text hdfsDelegationTokenKind = new Text("HDFS_DELEGATION_TOKEN");
final Text amRmTokenService = new Text("rm-ip:8030");
Expand Down Expand Up @@ -177,7 +168,7 @@ public void testCreateTaskExecutorCredentials() throws Exception {
hasHdfsDelegationToken = true;
}
}
assertTrue(hasHdfsDelegationToken);
assertFalse(hasAmRmToken);
assertThat(hasHdfsDelegationToken).isTrue();
assertThat(hasAmRmToken).isFalse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.io.FileNotFoundException;
import java.time.Duration;
Expand All @@ -44,19 +44,19 @@
import static org.apache.flink.yarn.util.TestUtils.getTestJarPath;

/** Test cases for the deployment of Yarn Flink application clusters. */
public class YARNApplicationITCase extends YarnTestBase {
class YARNApplicationITCase extends YarnTestBase {

private static final Duration yarnAppTerminateTimeout = Duration.ofSeconds(30);
private static final int sleepIntervalInMS = 100;

@BeforeClass
@BeforeAll
public static void setup() {
YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-application");
startYARNWithConfig(YARN_CONFIGURATION, true);
}

@Test
public void testApplicationClusterWithLocalUserJarAndFirstUserJarInclusion() throws Exception {
void testApplicationClusterWithLocalUserJarAndFirstUserJarInclusion() throws Exception {
runTest(
() ->
deployApplication(
Expand All @@ -66,8 +66,7 @@ public void testApplicationClusterWithLocalUserJarAndFirstUserJarInclusion() thr
}

@Test
public void testApplicationClusterWithLocalUserJarAndDisableUserJarInclusion()
throws Exception {
void testApplicationClusterWithLocalUserJarAndDisableUserJarInclusion() throws Exception {
runTest(
() ->
deployApplication(
Expand All @@ -77,7 +76,7 @@ public void testApplicationClusterWithLocalUserJarAndDisableUserJarInclusion()
}

@Test
public void testApplicationClusterWithRemoteUserJar() throws Exception {
void testApplicationClusterWithRemoteUserJar() throws Exception {
final Path testingJar = getTestingJar();
final Path remoteJar =
new Path(miniDFSCluster.getFileSystem().getHomeDirectory(), testingJar.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,45 +37,42 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;

import static org.apache.flink.yarn.configuration.YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Test cases for the deployment of Yarn Flink clusters with customized file replication numbers.
*/
public class YARNFileReplicationITCase extends YarnTestBase {
class YARNFileReplicationITCase extends YarnTestBase {

private static final Duration yarnAppTerminateTimeout = Duration.ofSeconds(10);
private static final int sleepIntervalInMS = 100;

@BeforeClass
@BeforeAll
public static void setup() {
YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-per-job");
startYARNWithConfig(YARN_CONFIGURATION, true);
}

@Test
public void testPerJobModeWithCustomizedFileReplication() throws Exception {
void testPerJobModeWithCustomizedFileReplication() throws Exception {
final Configuration configuration = getDefaultConfiguration();
configuration.setInteger(YarnConfigOptions.FILE_REPLICATION, 4);

runTest(() -> deployPerJob(configuration, getTestingJobGraph()));
}

@Test
public void testPerJobModeWithDefaultFileReplication() throws Exception {
void testPerJobModeWithDefaultFileReplication() throws Exception {
runTest(() -> deployPerJob(getDefaultConfiguration(), getTestingJobGraph()));
}

Expand Down Expand Up @@ -116,7 +113,7 @@ private void deployPerJob(Configuration configuration, JobGraph jobGraph) throws

final JobResult jobResult = jobResultCompletableFuture.get();

assertThat(jobResult, is(notNullValue()));
assertThat(jobResult).isNotNull();
jobResult
.getSerializedThrowable()
.ifPresent(
Expand Down Expand Up @@ -165,10 +162,10 @@ private void extraVerification(Configuration configuration, ApplicationId applic
Path uberJarHDFSPath = new Path(fs.getHomeDirectory(), suffix);

assertTrue(
fs.exists(uberJarHDFSPath),
"The Flink uber jar needs to exist. If it does not exist, then this "
+ "indicates that the Flink cluster has already terminated and Yarn has "
+ "already deleted the working directory.",
fs.exists(uberJarHDFSPath));
+ "already deleted the working directory.");

FileStatus fsStatus = fs.getFileStatus(uberJarHDFSPath);

Expand All @@ -181,6 +178,6 @@ private void extraVerification(Configuration configuration, ApplicationId applic
// If YarnConfigOptions.FILE_REPLICATION is not set. The replication number should equals to
// yarn configuration value.
int expectedReplication = flinkFileReplication > 0 ? flinkFileReplication : replication;
assertEquals(expectedReplication, fsStatus.getReplication());
assertThat((int) fsStatus.getReplication()).isEqualTo(expectedReplication);
}
}
Loading

0 comments on commit b98c66c

Please sign in to comment.