Skip to content

Commit

Permalink
[hotfix][runtime][test] Adds additional test for when the JobManagerR…
Browse files Browse the repository at this point in the history
…unner result completes exceptionally

Additionally, I moved the test's documentation into the
production code because it makes more sense to have the
reasoning over there.
  • Loading branch information
XComp committed Apr 28, 2022
1 parent 3690c3f commit d940af6
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,8 @@ private CompletableFuture<CleanupJobState> handleJobManagerRunnerResult(
JobManagerRunnerResult jobManagerRunnerResult, ExecutionType executionType) {
if (jobManagerRunnerResult.isInitializationFailure()
&& executionType == ExecutionType.RECOVERY) {
// fail fatally to make the Dispatcher fail-over and recover all jobs once more (which
// can only happen in HA mode)
return CompletableFuture.completedFuture(
jobManagerRunnerFailed(
jobManagerRunnerResult.getExecutionGraphInfo().getJobId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -663,13 +664,35 @@ private URI createTestingSavepoint() throws IOException, URISyntaxException {
return new URI(completedCheckpointStorageLocation.getExternalPointer());
}

/**
* Tests that the {@link Dispatcher} fails fatally if the recovered jobs cannot be started. See
* FLINK-9097.
*/
@Test
public void testFatalErrorIfRecoveredJobsCannotBeStarted() throws Exception {
final FlinkException testException = new FlinkException("Test exception");
testJobManagerRunnerFailureResultingInFatalError(
(testingJobManagerRunner, actualError) ->
testingJobManagerRunner.completeResultFuture(
// Let the initialization of the JobManagerRunner fail
JobManagerRunnerResult.forInitializationFailure(
new ExecutionGraphInfo(
ArchivedExecutionGraph
.createSparseArchivedExecutionGraph(
jobId,
jobGraph.getName(),
JobStatus.FAILED,
actualError,
jobGraph.getCheckpointingSettings(),
1L)),
actualError)));
}

@Test
public void testFatalErrorIfSomeOtherErrorCausedTheJobMasterToFail() throws Exception {
testJobManagerRunnerFailureResultingInFatalError(
TestingJobManagerRunner::completeResultFutureExceptionally);
}

private void testJobManagerRunnerFailureResultingInFatalError(
BiConsumer<TestingJobManagerRunner, Exception> jobManagerRunnerWithErrorConsumer)
throws Exception {
final FlinkException testException = new FlinkException("Expected test exception");
jobMasterLeaderElectionService.isLeader(UUID.randomUUID());

final TestingJobMasterServiceLeadershipRunnerFactory jobManagerRunnerFactory =
Expand All @@ -685,21 +708,8 @@ public void testFatalErrorIfRecoveredJobsCannotBeStarted() throws Exception {
final TestingFatalErrorHandler fatalErrorHandler =
testingFatalErrorHandlerResource.getFatalErrorHandler();

final TestingJobManagerRunner testingJobManagerRunner =
jobManagerRunnerFactory.takeCreatedJobManagerRunner();

// Let the initialization of the JobManagerRunner fail
testingJobManagerRunner.completeResultFuture(
JobManagerRunnerResult.forInitializationFailure(
new ExecutionGraphInfo(
ArchivedExecutionGraph.createSparseArchivedExecutionGraph(
jobId,
jobGraph.getName(),
JobStatus.FAILED,
testException,
jobGraph.getCheckpointingSettings(),
1L)),
testException));
jobManagerRunnerWithErrorConsumer.accept(
jobManagerRunnerFactory.takeCreatedJobManagerRunner(), testException);

final Throwable error =
fatalErrorHandler
Expand Down

0 comments on commit d940af6

Please sign in to comment.