Skip to content

Commit

Permalink
[FLINK-32835][runtime] Migrate unit tests in "accumulators" and "blob…
Browse files Browse the repository at this point in the history
…" packages to JUnit5
  • Loading branch information
ferenc-csaky authored and 1996fanrui committed Aug 28, 2023
1 parent a68dd41 commit 3ef7132
Show file tree
Hide file tree
Showing 31 changed files with 1,694 additions and 1,901 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,9 @@
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.examples.java.wordcount.WordCount;
import org.apache.flink.runtime.blob.BlobCacheCorruptionTest;
import org.apache.flink.runtime.blob.BlobCacheRecoveryTest;
import org.apache.flink.runtime.blob.BlobServerCorruptionTest;
import org.apache.flink.runtime.blob.BlobServerRecoveryTest;
import org.apache.flink.runtime.blob.BlobStoreService;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.blob.TestingBlobHelpers;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.util.OperatingSystem;
Expand Down Expand Up @@ -215,7 +212,7 @@ public void testBlobServerRecovery() throws Exception {
BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config);

try {
BlobServerRecoveryTest.testBlobServerRecovery(
TestingBlobHelpers.testBlobServerRecovery(
config, blobStoreService, temporaryFolder.newFolder());
} finally {
blobStoreService.closeAndCleanupAllData();
Expand All @@ -238,7 +235,7 @@ public void testBlobServerCorruptedFile() throws Exception {
BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config);

try {
BlobServerCorruptionTest.testGetFailsFromCorruptFile(
TestingBlobHelpers.testGetFailsFromCorruptFile(
config, blobStoreService, temporaryFolder.newFolder());
} finally {
blobStoreService.closeAndCleanupAllData();
Expand All @@ -261,7 +258,7 @@ public void testBlobCacheRecovery() throws Exception {
BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config);

try {
BlobCacheRecoveryTest.testBlobCacheRecovery(
TestingBlobHelpers.testBlobCacheRecovery(
config, blobStoreService, temporaryFolder.newFolder());
} finally {
blobStoreService.closeAndCleanupAllData();
Expand All @@ -284,7 +281,7 @@ public void testBlobCacheCorruptedFile() throws Exception {
BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config);

try {
BlobCacheCorruptionTest.testGetFailsFromCorruptFile(
TestingBlobHelpers.testGetFailsFromCorruptFile(
new JobID(), config, blobStoreService, temporaryFolder.newFolder());
} finally {
blobStoreService.closeAndCleanupAllData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,42 +25,41 @@
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.OptionalFailure;

import org.junit.Test;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for the {@link StringifiedAccumulatorResult}. */
public class StringifiedAccumulatorResultTest {
class StringifiedAccumulatorResultTest {

@Test
public void testSerialization() throws IOException {
void testSerialization() throws IOException {
final String name = "a";
final String type = "b";
final String value = "c";
final StringifiedAccumulatorResult original =
new StringifiedAccumulatorResult(name, type, value);

// Confirm no funny business in the constructor to getter pathway
assertEquals(name, original.getName());
assertEquals(type, original.getType());
assertEquals(value, original.getValue());
assertThat(original.getName()).isEqualTo(name);
assertThat(original.getType()).isEqualTo(type);
assertThat(original.getValue()).isEqualTo(value);

final StringifiedAccumulatorResult copy = CommonTestUtils.createCopySerializable(original);

// Copy should have equivalent core fields
assertEquals(name, copy.getName());
assertEquals(type, copy.getType());
assertEquals(value, copy.getValue());
assertThat(copy.getName()).isEqualTo(name);
assertThat(copy.getType()).isEqualTo(type);
assertThat(copy.getValue()).isEqualTo(value);
}

@Test
public void stringifyingResultsShouldIncorporateAccumulatorLocalValueDirectly() {
void stringifyingResultsShouldIncorporateAccumulatorLocalValueDirectly() {
final String name = "a";
final int targetValue = 314159;
final IntCounter acc = new IntCounter();
Expand All @@ -71,16 +70,16 @@ public void stringifyingResultsShouldIncorporateAccumulatorLocalValueDirectly()
final StringifiedAccumulatorResult[] results =
StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);

assertEquals(1, results.length);
assertThat(results).hasSize(1);

final StringifiedAccumulatorResult firstResult = results[0];
assertEquals(name, firstResult.getName());
assertEquals("IntCounter", firstResult.getType());
assertEquals(Integer.toString(targetValue), firstResult.getValue());
assertThat(firstResult.getName()).isEqualTo(name);
assertThat(firstResult.getType()).isEqualTo("IntCounter");
assertThat(firstResult.getValue()).isEqualTo(Integer.toString(targetValue));
}

@Test
public void stringifyingResultsShouldReportNullLocalValueAsNonnullValueString() {
void stringifyingResultsShouldReportNullLocalValueAsNonnullValueString() {
final String name = "a";
final NullBearingAccumulator acc = new NullBearingAccumulator();
final Map<String, OptionalFailure<Accumulator<?, ?>>> accumulatorMap = new HashMap<>();
Expand All @@ -89,52 +88,50 @@ public void stringifyingResultsShouldReportNullLocalValueAsNonnullValueString()
final StringifiedAccumulatorResult[] results =
StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);

assertEquals(1, results.length);
assertThat(results).hasSize(1);

// Note the use of a String with a content of "null" rather than a null value
final StringifiedAccumulatorResult firstResult = results[0];
assertEquals(name, firstResult.getName());
assertEquals("NullBearingAccumulator", firstResult.getType());
assertEquals("null", firstResult.getValue());
assertThat(firstResult.getName()).isEqualTo(name);
assertThat(firstResult.getType()).isEqualTo("NullBearingAccumulator");
assertThat(firstResult.getValue()).isEqualTo("null");
}

@Test
public void stringifyingResultsShouldReportNullAccumulatorWithNonnullValueAndTypeString() {
void stringifyingResultsShouldReportNullAccumulatorWithNonnullValueAndTypeString() {
final String name = "a";
final Map<String, OptionalFailure<Accumulator<?, ?>>> accumulatorMap = new HashMap<>();
accumulatorMap.put(name, null);

final StringifiedAccumulatorResult[] results =
StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);

assertEquals(1, results.length);
assertThat(results).hasSize(1);

// Note the use of String values with content of "null" rather than null values
final StringifiedAccumulatorResult firstResult = results[0];
assertEquals(name, firstResult.getName());
assertEquals("null", firstResult.getType());
assertEquals("null", firstResult.getValue());
assertThat(firstResult.getName()).isEqualTo(name);
assertThat(firstResult.getType()).isEqualTo("null");
assertThat(firstResult.getValue()).isEqualTo("null");
}

@Test
public void stringifyingFailureResults() {
void stringifyingFailureResults() {
final String name = "a";
final Map<String, OptionalFailure<Accumulator<?, ?>>> accumulatorMap = new HashMap<>();
accumulatorMap.put(name, OptionalFailure.ofFailure(new FlinkRuntimeException("Test")));

final StringifiedAccumulatorResult[] results =
StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);

assertEquals(1, results.length);
assertThat(results).hasSize(1);

// Note the use of String values with content of "null" rather than null values
final StringifiedAccumulatorResult firstResult = results[0];
assertEquals(name, firstResult.getName());
assertEquals("null", firstResult.getType());
assertTrue(
firstResult
.getValue()
.startsWith("org.apache.flink.util.FlinkRuntimeException: Test"));
assertThat(firstResult.getName()).isEqualTo(name);
assertThat(firstResult.getType()).isEqualTo("null");
assertThat(firstResult.getValue())
.startsWith("org.apache.flink.util.FlinkRuntimeException: Test");
}

private static class NullBearingAccumulator implements SimpleAccumulator<Serializable> {
Expand Down
Loading

0 comments on commit 3ef7132

Please sign in to comment.