Skip to content

Commit

Permalink
[FLINK-32852][JUnit5 Migration] Migrate the scheduler package of flin…
Browse files Browse the repository at this point in the history
…k-runtime module to JUnit5 (apache#24732)
  • Loading branch information
RocMarshal authored May 7, 2024
1 parent 1904b21 commit ca441b8
Show file tree
Hide file tree
Showing 53 changed files with 1,062 additions and 1,054 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,10 @@
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.TestLoggerExtension;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
Expand All @@ -60,7 +58,6 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link DefaultExecutionDeployer}. */
@ExtendWith(TestLoggerExtension.class)
class DefaultExecutionDeployerTest {

private ScheduledExecutorService executor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
Expand All @@ -45,17 +44,17 @@
import org.apache.flink.runtime.shuffle.ShuffleTestUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.traces.Span;
import org.apache.flink.traces.SpanBuilder;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.clock.SystemClock;

import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;

Expand All @@ -66,49 +65,57 @@
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;

import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for the {@link DefaultExecutionGraphFactory}. */
public class DefaultExecutionGraphFactoryTest extends TestLogger {
class DefaultExecutionGraphFactoryTest {

@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
private static final Logger log =
LoggerFactory.getLogger(DefaultExecutionGraphFactoryTest.class);

@ClassRule
public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
TestingUtils.defaultExecutorResource();
@TempDir private File tempDir;
private File temporaryFile;

@RegisterExtension
private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION =
TestingUtils.defaultExecutorExtension();

@BeforeEach
private void setup() {
temporaryFile = new File(tempDir.getAbsolutePath(), "stateFile");
}

@Test
public void testRestoringModifiedJobFromSavepointFails() throws Exception {
void testRestoringModifiedJobFromSavepointFails() throws Exception {
final JobGraph jobGraphWithNewOperator = createJobGraphWithSavepoint(false, 42L, 1);

final ExecutionGraphFactory executionGraphFactory = createExecutionGraphFactory();

try {
executionGraphFactory.createAndRestoreExecutionGraph(
jobGraphWithNewOperator,
new StandaloneCompletedCheckpointStore(1),
new CheckpointsCleaner(),
new StandaloneCheckpointIDCounter(),
TaskDeploymentDescriptorFactory.PartitionLocationConstraint.CAN_BE_UNKNOWN,
0L,
new DefaultVertexAttemptNumberStore(),
SchedulerBase.computeVertexParallelismStore(jobGraphWithNewOperator),
(execution, previousState, newState) -> {},
rp -> false,
log);
fail("Expected ExecutionGraph creation to fail because of non restored state.");
} catch (Exception e) {
assertThat(
e, FlinkMatchers.containsMessage("Failed to rollback to checkpoint/savepoint"));
}
assertThatThrownBy(
() ->
executionGraphFactory.createAndRestoreExecutionGraph(
jobGraphWithNewOperator,
new StandaloneCompletedCheckpointStore(1),
new CheckpointsCleaner(),
new StandaloneCheckpointIDCounter(),
TaskDeploymentDescriptorFactory.PartitionLocationConstraint
.CAN_BE_UNKNOWN,
0L,
new DefaultVertexAttemptNumberStore(),
SchedulerBase.computeVertexParallelismStore(
jobGraphWithNewOperator),
(execution, previousState, newState) -> {},
rp -> false,
log))
.withFailMessage(
"Expected ExecutionGraph creation to fail because of non restored state.")
.isInstanceOf(Exception.class)
.hasMessageContaining("Failed to rollback to checkpoint/savepoint");
}

@Test
public void testRestoringModifiedJobFromSavepointWithAllowNonRestoredStateSucceeds()
throws Exception {
void testRestoringModifiedJobFromSavepointWithAllowNonRestoredStateSucceeds() throws Exception {
// create savepoint data
final long savepointId = 42L;
final JobGraph jobGraphWithNewOperator = createJobGraphWithSavepoint(true, savepointId, 1);
Expand All @@ -132,13 +139,12 @@ public void testRestoringModifiedJobFromSavepointWithAllowNonRestoredStateSuccee

final CompletedCheckpoint savepoint = completedCheckpointStore.getLatestCheckpoint();

MatcherAssert.assertThat(savepoint, notNullValue());

MatcherAssert.assertThat(savepoint.getCheckpointID(), Matchers.is(savepointId));
assertThat(savepoint).isNotNull();
assertThat(savepoint.getCheckpointID()).isEqualTo(savepointId);
}

@Test
public void testCheckpointStatsTrackerUpdatedWithNewParallelism() throws Exception {
void testCheckpointStatsTrackerUpdatedWithNewParallelism() throws Exception {
final long savepointId = 42L;
final JobGraph jobGraphWithParallelism2 = createJobGraphWithSavepoint(true, savepointId, 2);

Expand Down Expand Up @@ -184,7 +190,7 @@ public void addSpan(SpanBuilder spanBuilder) {
SystemClock.getInstance().absoluteTimeMillis())
.build());

MatcherAssert.assertThat(spans, hasSize(1));
assertThat(spans).hasSize(1);
}

@Nonnull
Expand All @@ -201,8 +207,8 @@ private ExecutionGraphFactory createExecutionGraphFactory(
new Configuration(),
ClassLoader.getSystemClassLoader(),
new DefaultExecutionDeploymentTracker(),
EXECUTOR_RESOURCE.getExecutor(),
EXECUTOR_RESOURCE.getExecutor(),
EXECUTOR_EXTENSION.getExecutor(),
EXECUTOR_EXTENSION.getExecutor(),
Time.milliseconds(0L),
metricGroup,
VoidBlobWriter.getInstance(),
Expand All @@ -217,8 +223,7 @@ private JobGraph createJobGraphWithSavepoint(
// create savepoint data
final OperatorID operatorID = new OperatorID();
final File savepointFile =
TestUtils.createSavepointWithOperatorState(
TEMPORARY_FOLDER.newFile(), savepointId, operatorID);
TestUtils.createSavepointWithOperatorState(temporaryFile, savepointId, operatorID);

// set savepoint settings which don't allow non restored state
final SavepointRestoreSettings savepointRestoreSettings =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,29 +30,28 @@
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.TestingOperatorCoordinator;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.SerializedValue;

import org.junit.ClassRule;
import org.junit.Test;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.io.IOException;
import java.util.concurrent.ScheduledExecutorService;

import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
import static org.apache.flink.runtime.jobgraph.DistributionPattern.ALL_TO_ALL;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.assertj.core.api.Assertions.assertThat;

/** Test for {@link DefaultOperatorCoordinatorHandler}. */
public class DefaultOperatorCoordinatorHandlerTest {
@ClassRule
public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
TestingUtils.defaultExecutorResource();
class DefaultOperatorCoordinatorHandlerTest {
@RegisterExtension
private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION =
TestingUtils.defaultExecutorExtension();

@Test
public void testRegisterAndStartNewCoordinators() throws Exception {
void testRegisterAndStartNewCoordinators() throws Exception {

final JobVertex[] jobVertices = createJobVertices(BLOCKING);
OperatorID operatorId1 = OperatorID.fromJobVertexID(jobVertices[0].getID());
Expand All @@ -67,16 +66,15 @@ public void testRegisterAndStartNewCoordinators() throws Exception {

DefaultOperatorCoordinatorHandler handler =
new DefaultOperatorCoordinatorHandler(executionGraph, throwable -> {});
assertThat(handler.getCoordinatorMap().keySet(), containsInAnyOrder(operatorId1));
assertThat(handler.getCoordinatorMap().keySet()).contains(operatorId1);

executionGraph.initializeJobVertex(ejv2, 0L);
handler.registerAndStartNewCoordinators(
ejv2.getOperatorCoordinators(),
executionGraph.getJobMasterMainThreadExecutor(),
ejv2.getParallelism());

assertThat(
handler.getCoordinatorMap().keySet(), containsInAnyOrder(operatorId1, operatorId2));
assertThat(handler.getCoordinatorMap().keySet()).contains(operatorId1, operatorId2);
}

private JobVertex[] createJobVertices(ResultPartitionType resultPartitionType)
Expand All @@ -103,6 +101,6 @@ private JobVertex[] createJobVertices(ResultPartitionType resultPartitionType)
private DefaultExecutionGraph createDynamicGraph(JobVertex... jobVertices) throws Exception {
return TestingDefaultExecutionGraphBuilder.newBuilder()
.setJobGraph(new JobGraph(new JobID(), "TestJob", jobVertices))
.buildDynamicGraph(EXECUTOR_RESOURCE.getExecutor());
.buildDynamicGraph(EXECUTOR_EXTENSION.getExecutor());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,13 @@
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.TestLogger;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.function.CheckedSupplier;

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -60,32 +59,31 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.fail;
import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
import static org.assertj.core.api.Assertions.fail;

/** Tests base for the scheduling of batch jobs. */
public class DefaultSchedulerBatchSchedulingTest extends TestLogger {
class DefaultSchedulerBatchSchedulingTest {

protected final Logger log = LoggerFactory.getLogger(getClass());

@ClassRule
public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
TestingUtils.defaultExecutorResource();
@RegisterExtension
private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION =
TestingUtils.defaultExecutorExtension();

private static ScheduledExecutorService singleThreadScheduledExecutorService;
private static ComponentMainThreadExecutor mainThreadExecutor;

@BeforeClass
public static void setupClass() {
@BeforeAll
private static void setupClass() {
singleThreadScheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
mainThreadExecutor =
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
singleThreadScheduledExecutorService);
}

@AfterClass
public static void teardownClass() {
@AfterAll
private static void teardownClass() {
if (singleThreadScheduledExecutorService != null) {
singleThreadScheduledExecutorService.shutdownNow();
}
Expand All @@ -96,7 +94,7 @@ public static void teardownClass() {
* for more information.
*/
@Test
public void testSchedulingOfJobWithFewerSlotsThanParallelism() throws Exception {
void testSchedulingOfJobWithFewerSlotsThanParallelism() throws Exception {
final int parallelism = 5;
final Time batchSlotTimeout = Time.milliseconds(5L);
final JobGraph jobGraph = createBatchJobGraph(parallelism);
Expand Down Expand Up @@ -159,7 +157,7 @@ public void testSchedulingOfJobWithFewerSlotsThanParallelism() throws Exception
}
}

assertThat(terminationFuture.get(), is(JobStatus.FINISHED));
assertThatFuture(terminationFuture).isCompletedWithValue(JobStatus.FINISHED);
}
}

Expand Down Expand Up @@ -206,7 +204,7 @@ private SchedulerNG createScheduler(
JobStatusListener jobStatusListener)
throws Exception {
return new DefaultSchedulerBuilder(
jobGraph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor())
jobGraph, mainThreadExecutor, EXECUTOR_EXTENSION.getExecutor())
.setExecutionSlotAllocatorFactory(
SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(
physicalSlotProvider, slotRequestTimeout))
Expand Down
Loading

0 comments on commit ca441b8

Please sign in to comment.