Skip to content

Commit

Permalink
[FLINK-27882][tests][table] Migrate flink-scala to JUnit5
Browse files Browse the repository at this point in the history
  • Loading branch information
snuyanzin authored Nov 20, 2023
1 parent e68411a commit 278504a
Show file tree
Hide file tree
Showing 32 changed files with 813 additions and 867 deletions.
6 changes: 6 additions & 0 deletions flink-scala/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,12 @@ under the License.
<goals>
<goal>test-jar</goal>
</goals>
<configuration>
<excludes>
<!-- test-jar is still used by JUnit4 modules -->
<exclude>META-INF/services/org.junit.jupiter.api.extension.Extension</exclude>
</excludes>
</configuration>
</execution>
</executions>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
Expand All @@ -35,50 +33,49 @@

import scala.Tuple3;

import static org.junit.Assert.fail;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link ScalaCsvOutputFormat}. */
public class ScalaCsvOutputFormatTest {
class ScalaCsvOutputFormatTest {

private String path;
private ScalaCsvOutputFormat<Tuple3<String, String, Integer>> csvOutputFormat;

@Rule public final TemporaryFolder tmpFolder = new TemporaryFolder();
@TempDir private java.nio.file.Path tmpFolder;

@Before
public void setUp() throws Exception {
path = tmpFolder.newFile().getAbsolutePath();
@BeforeEach
void setUp() throws Exception {
path = tmpFolder.toFile().getAbsolutePath();
csvOutputFormat = new ScalaCsvOutputFormat<>(new Path(path));
csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY);
csvOutputFormat.open(0, 1);
}

@Test
public void testNullAllow() throws Exception {
void testNullAllow() throws Exception {
try {
csvOutputFormat.setAllowNullValues(true);
csvOutputFormat.writeRecord(new Tuple3<>("One", null, 8));
} finally {
csvOutputFormat.close();
}
java.nio.file.Path p = Paths.get(path);
Assert.assertTrue(Files.exists(p));
assertThat(p).exists();
List<String> lines = Files.readAllLines(Paths.get(path), StandardCharsets.UTF_8);
Assert.assertEquals(1, lines.size());
Assert.assertEquals("One,,8", lines.get(0));
assertThat(lines).hasSize(1);
assertThat(lines.get(0)).isEqualTo("One,,8");
}

@Test
public void testNullDisallowOnDefault() throws Exception {
try {
csvOutputFormat.setAllowNullValues(false);
csvOutputFormat.writeRecord(new Tuple3<>("One", null, 8));
fail("should fail with an exception");
} catch (RuntimeException e) {
// expected
} finally {
csvOutputFormat.close();
}
void testNullDisallowOnDefault() throws Exception {
assertThatThrownBy(
() -> {
csvOutputFormat.setAllowNullValues(false);
csvOutputFormat.writeRecord(new Tuple3<>("One", null, 8));
})
.isInstanceOf(RuntimeException.class);
csvOutputFormat.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,13 @@

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Registration;
import org.junit.Test;
import org.junit.jupiter.api.Test;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

/**
* Tests that the set of Kryo registrations is the same across compatible Flink versions.
Expand All @@ -44,7 +38,7 @@
* Kryo serializer itself sits, because when runtime is present in the classpath, Chill is used to
* instantiate Kryo and adds the proper set of registrations.
*/
public class KryoSerializerRegistrationsTest {
class KryoSerializerRegistrationsTest {

/**
* Tests that the registered classes in Kryo did not change.
Expand All @@ -53,7 +47,7 @@ public class KryoSerializerRegistrationsTest {
* change in the serializers can break savepoint backwards compatibility between Flink versions.
*/
@Test
public void testDefaultKryoRegisteredClassesDidNotChange() throws Exception {
void testDefaultKryoRegisteredClassesDidNotChange() throws Exception {
final Kryo kryo = new KryoSerializer<>(Integer.class, new ExecutionConfig()).getKryo();

try (BufferedReader reader =
Expand All @@ -78,10 +72,9 @@ public void testDefaultKryoRegisteredClassesDidNotChange() throws Exception {
// only available if flink-avro is present. There is a special version of
// this test in AvroKryoSerializerRegistrationsTest that verifies correct
// registration of Avro types if present
assertThat(
registration.getType().getName(),
is(
"org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroRegisteredClass"));
assertThat(registration.getType().getName())
.isEqualTo(
"org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroRegisteredClass");
} else if (!registeredClass.equals(registration.getType().getName())) {
fail(
String.format(
Expand All @@ -91,41 +84,4 @@ public void testDefaultKryoRegisteredClassesDidNotChange() throws Exception {
}
}
}

/**
* Creates a Kryo serializer and writes the default registrations out to a comma separated file
* with one entry per line:
*
* <pre>
* id,class
* </pre>
*
* <p>The produced file is used to check that the registered IDs don't change in future Flink
* versions.
*
* <p>This method is not used in the tests, but documents how the test file has been created and
* can be used to re-create it if needed.
*
* @param filePath File path to write registrations to
*/
private void writeDefaultKryoRegistrations(String filePath) throws IOException {
final File file = new File(filePath);
if (file.exists()) {
assertTrue(file.delete());
}

final Kryo kryo = new KryoSerializer<>(Integer.class, new ExecutionConfig()).getKryo();
final int nextId = kryo.getNextRegistrationId();

try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) {
for (int i = 0; i < nextId; i++) {
Registration registration = kryo.getRegistration(i);
String str = registration.getId() + "," + registration.getType().getName();
writer.write(str, 0, str.length());
writer.newLine();
}

System.out.println("Created file with registrations at " + file.getAbsolutePath());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.flink.util.TestLoggerExtension
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ package org.apache.flink.api.scala
import org.apache.flink.api.common.InvalidProgramException
import org.apache.flink.api.java.io.DiscardingOutputFormat

import org.junit.Test
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.jupiter.api.Test

// Verify that the sanity checking in delta iterations works. We just
// have a dummy job that is not meant to be executed. Only verify that
Expand Down Expand Up @@ -57,49 +58,60 @@ class DeltaIterationSanityCheckTest extends Serializable {
iteration.output(new DiscardingOutputFormat[(Int, String)])
}

@Test(expected = classOf[InvalidProgramException])
@Test
def testIncorrectJoinWithSolution1(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val solutionInput = env.fromElements((1, "1"))
val worksetInput = env.fromElements((2, "2"))

val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) {
(s, ws) =>
val result = s.join(ws).where("_2").equalTo("_2")((l, r) => l)
(result, ws)
}

iteration.output(new DiscardingOutputFormat[(Int, String)])
assertThatThrownBy(
() => {
val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) {
(s, ws) =>
val result = s.join(ws).where("_2").equalTo("_2")((l, r) => l)
(result, ws)
}

iteration.output(new DiscardingOutputFormat[(Int, String)])
})
.isInstanceOf(classOf[InvalidProgramException])
}

@Test(expected = classOf[InvalidProgramException])
@Test
def testIncorrectJoinWithSolution2(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val solutionInput = env.fromElements((1, "1"))
val worksetInput = env.fromElements((2, "2"))

val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) {
(s, ws) =>
val result = ws.join(s).where("_2").equalTo("_2")((l, r) => l)
(result, ws)
}

iteration.output(new DiscardingOutputFormat[(Int, String)])
assertThatThrownBy(
() => {
val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) {
(s, ws) =>
val result = ws.join(s).where("_2").equalTo("_2")((l, r) => l)
(result, ws)
}

iteration.output(new DiscardingOutputFormat[(Int, String)])
})
.isInstanceOf(classOf[InvalidProgramException])
}

@Test(expected = classOf[InvalidProgramException])
@Test
def testIncorrectJoinWithSolution3(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val solutionInput = env.fromElements((1, "1"))
val worksetInput = env.fromElements((2, "2"))

val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_2")) {
(s, ws) =>
val result = ws.join(s).where("_1").equalTo("_1")((l, r) => l)
(result, ws)
}

iteration.output(new DiscardingOutputFormat[(Int, String)])
assertThatThrownBy(
() => {
val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_2")) {
(s, ws) =>
val result = ws.join(s).where("_1").equalTo("_1")((l, r) => l)
(result, ws)
}

iteration.output(new DiscardingOutputFormat[(Int, String)])
})
.isInstanceOf(classOf[InvalidProgramException])
}

@Test
Expand Down Expand Up @@ -132,48 +144,57 @@ class DeltaIterationSanityCheckTest extends Serializable {
iteration.output(new DiscardingOutputFormat[(Int, String)])
}

@Test(expected = classOf[InvalidProgramException])
@Test
def testIncorrectCoGroupWithSolution1(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val solutionInput = env.fromElements((1, "1"))
val worksetInput = env.fromElements((2, "2"))

val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) {
(s, ws) =>
val result = s.coGroup(ws).where("_2").equalTo("_2")((l, r) => l.min)
(result, ws)
}
assertThatThrownBy(
() => {
val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) {
(s, ws) =>
val result = s.coGroup(ws).where("_2").equalTo("_2")((l, r) => l.min)
(result, ws)
}

iteration.output(new DiscardingOutputFormat[(Int, String)])
iteration.output(new DiscardingOutputFormat[(Int, String)])
}).isInstanceOf(classOf[InvalidProgramException])
}

@Test(expected = classOf[InvalidProgramException])
@Test
def testIncorrectCoGroupWithSolution2(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val solutionInput = env.fromElements((1, "1"))
val worksetInput = env.fromElements((2, "2"))

val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) {
(s, ws) =>
val result = ws.coGroup(s).where("_2").equalTo("_2")((l, r) => l.min)
(result, ws)
}
assertThatThrownBy(
() => {
val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) {
(s, ws) =>
val result = ws.coGroup(s).where("_2").equalTo("_2")((l, r) => l.min)
(result, ws)
}

iteration.output(new DiscardingOutputFormat[(Int, String)])
iteration.output(new DiscardingOutputFormat[(Int, String)])
}).isInstanceOf(classOf[InvalidProgramException])
}

@Test(expected = classOf[InvalidProgramException])
@Test
def testIncorrectCoGroupWithSolution3(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val solutionInput = env.fromElements((1, "1"))
val worksetInput = env.fromElements((2, "2"))

val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_2")) {
(s, ws) =>
val result = ws.coGroup(s).where("_1").equalTo("_1")((l, r) => l.min)
(result, ws)
}
assertThatThrownBy(
() => {
val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_2")) {
(s, ws) =>
val result = ws.coGroup(s).where("_1").equalTo("_1")((l, r) => l.min)
(result, ws)
}

iteration.output(new DiscardingOutputFormat[(Int, String)])
iteration.output(new DiscardingOutputFormat[(Int, String)])
}).isInstanceOf(classOf[InvalidProgramException])
}
}
Loading

0 comments on commit 278504a

Please sign in to comment.